Make keyboard emulation for debug work

This commit is contained in:
jaseg 2021-03-24 14:17:28 +01:00
parent 4fcc3337e2
commit dbb264c68d
2 changed files with 241 additions and 46 deletions

View file

@ -2,7 +2,9 @@
import time
import enum
import os
import sys
import binascii
from contextlib import contextmanager, suppress, wraps
import hashlib
import secrets
@ -58,11 +60,12 @@ class Packetizer:
packet = self.ser.read_until(b'\0')
data = cobs.decode(packet[:-1])
pkt_type, data = PacketType(data[0]), data[1:]
if self.debug:
print(f'\033[93mReceived {len(data)} bytes\033[0m')
print(f'\033[93mReceived {len(data)} bytes, packet type {pkt_type.name} ({pkt_type.value})\033[0m')
hexdump(print, data, self.width)
pkt_type, data = PacketType(data[0]), data[1:]
if pkt_type is PacketType.COMM_ERROR:
raise ProtocolError('Device-side serial communication error')
elif pkt_type is PacketType.CRYPTO_ERROR:
@ -253,6 +256,7 @@ class NoiseEngine:
msg_type, payload = self.packetizer.receive_packet()
rtype, data = self._decrypt(payload)
print(f'Received report {rtype.name} ({rtype.value})')
if rtype is ReportType.PAIRING_SUCCESS:
self.connected, self.paired = True, True
elif rtype is ReportType.PAIRING_START:
@ -334,7 +338,7 @@ class NoiseEngine:
raise ProtocolError('Device-side pairing error') # FIXME find better exception subclass here
else:
raise ProtocolError('Invalid report type')
raise ProtocolError(f'Invalid report type {msg_type}')
def uinput_passthrough(self):
mouse_foo = [uinput.BTN_LEFT, uinput.BTN_RIGHT, uinput.BTN_MIDDLE,
@ -351,7 +355,6 @@ class NoiseEngine:
raise ValueError('Unsupported report length', report_len)
modbyte, _reserved, *keycodes = report
import binascii
keys = { *KeyMapper.map_modifiers(modbyte), *KeyMapper.map_regulars(keycodes) }
if self.debug:
print('Emitting:', keys)
@ -366,7 +369,6 @@ class NoiseEngine:
elif msg_type is ReportType.MOUSE:
if report_len < 3 or report_len > 8:
raise ValueError('Unsupported report length', report_len)
import binascii
_report_type, buttons, a, b, c, w, _2 = report
x = ((b&0x0f)<<8) | a
y = ((b&0xf0) >> 4) | (c<<4)
@ -393,22 +395,71 @@ if __name__ == '__main__':
parser.add_argument('baudrate')
parser.add_argument('-w', '--width', type=int, default=16, help='Number of bytes to display in one line')
parser.add_argument('-d', '--debug', action='store_true')
parser.add_argument('-r', '--random-key', action='store_true', help='Use random key instead of persisted key')
args = parser.parse_args()
if args.random_key:
host_key_private = NoiseEngine.generate_private_key_x25519()
else:
XDG_CONFIG_HOME = os.environ.get('XDG_CONFIG_HOME') or os.path.join(os.path.expandvars('$HOME'), '.config', 'secure_hid')
if not os.path.isdir(XDG_CONFIG_HOME):
os.mkdir(XDG_CONFIG_HOME)
private_key_file = os.path.join(XDG_CONFIG_HOME, 'host_key.pem')
if not os.path.isfile(private_key_file):
with open(private_key_file, 'w') as f:
f.write(binascii.hexlify(hexnoise.NoiseEngine.generate_private_key_x25519()).decode())
with open(private_key_file) as f:
host_key_private = binascii.unhexlify(f.read())
ser = serial.Serial(args.serial, args.baudrate)
packetizer = Packetizer(ser, debug=args.debug, width=args.width)
temp_priv_key = NoiseEngine.generate_private_key_x25519()
noise = NoiseEngine(temp_priv_key, packetizer, debug=args.debug)
noise = NoiseEngine(host_key_private, packetizer, debug=args.debug)
noise.perform_handshake()
print('Handshake channel binding incantation:')
print(noise.channel_binding_incantation())
if noise.connected and not noise.paired:
print('Handshake channel binding incantation:')
channel_binding_incantation = noise.channel_binding_incantation()
print(channel_binding_incantation)
for user_input in noise.pairing_messages():
if not args.debug:
print('\033[2K\r', end='')
print('Pairing input:', user_input, end='' if not args.debug else '\n', flush=True)
print()
print('Pairing success')
def term_match(user_input, template):
out = ''
words_u = user_input.split(' ')
words_t = [ w for w in template.split(' ') if w != 'and' ]
for u_word in words_u:
if 'and'.startswith(u_word) or not u_word:
out += ' ' + u_word
continue
if not words_t:
out += '\033[91m'
continue
t_word, *words_t = words_t
if not t_word.startswith(u_word.strip(',')):
out += ' \033[91m' + u_word
else:
out += ' ' + u_word
return '\033[92m' + out[1:] + '\033[0m'
for user_input in noise.pairing_messages():
if not args.debug:
print('\033[2K\r', end='')
matched = term_match(user_input, channel_binding_incantation)
print('Pairing input:', matched, end='' if not args.debug else '\n', flush=True)
print()
print('Pairing success')
elif noise.connected and noise.paired:
print('Successfully re-connected')
else:
print('Could not connect')
sys.exit(1)
noise.uinput_passthrough()

View file

@ -191,9 +191,12 @@ static void gpio_setup(void)
/* Mode button */
gpio_mode_setup(GPIOE, GPIO_MODE_INPUT, GPIO_PUPD_PULLUP, GPIO15);
/* Test button */
gpio_mode_setup(GPIOB, GPIO_MODE_INPUT, GPIO_PUPD_PULLUP, GPIO10);
/* Speaker */
gpio_mode_setup(GPIOB, GPIO_MODE_OUTPUT, GPIO_PUPD_NONE, GPIO10);
gpio_set(GPIOB, GPIO10);
//gpio_mode_setup(GPIOB, GPIO_MODE_OUTPUT, GPIO_PUPD_NONE, GPIO10);
//gpio_set(GPIOB, GPIO10);
/* USB OTG FS phy outputs */
gpio_mode_setup(GPIOA, GPIO_MODE_AF, GPIO_PUPD_NONE, GPIO11 | GPIO12);
@ -222,7 +225,7 @@ void pairing_input(uint8_t modbyte, uint8_t keycode);
void pairing_parse_report(struct hid_report *buf, uint8_t len);
/* Minimum number of bytes of handshake hash to confirm during pairing */
#define MIN_PAIRING_SEQUENCE_LENGTH 4
#define MIN_PAIRING_SEQUENCE_LENGTH 1
int pairing_check(struct NoiseState *st, const char *buf) {
//LOG_PRINTF("Checking pairing\n");
@ -439,6 +442,12 @@ struct dma_usart_file debug_out_s = {
struct dma_usart_file *debug_out = &debug_out_s;
/* FIXME start unsafe debug code */
bool debug_emulate_pairing_input(char c);
bool debug_fill_report_from_ascii(char c, struct hid_report *out);
bool debug_send_encrypted_emulated_report(char c);
bool debug_enqueue_emulated_input(char c);
/* Feed keyboard input from debug UART into keyboard subsystem */
void usart1_isr(void) {
if (USART1_SR & USART_SR_ORE) { /* Overrun handling */
LOG_PRINTF("USART1 data register overrun\n");
@ -448,30 +457,126 @@ void usart1_isr(void) {
}
uint8_t data = USART1_DR; /* This automatically acknowledges the IRQ */
LOG_PRINTF(" %02x ", data);
if (data == '\r')
LOG_PRINTF("\n");
if (noise_state.handshake_state == HANDSHAKE_DONE_UNKNOWN_HOST) {
if (!debug_emulate_pairing_input(data)) {
LOG_PRINTF("Error emulating pairing input\n");
}
} else {
if (!debug_enqueue_emulated_input(data)) {
LOG_PRINTF("Error enqueueing emulated keystroke\n");
}
}
}
size_t debug_keystroke_queue_wptr = 0;
size_t debug_keystroke_queue_rptr = 0;
#define ARRAY_LENGTH(x) (sizeof(x) / sizeof((x)[0]))
char debug_keystroke_queue[64];
bool debug_enqueue_emulated_input(char c) {
size_t next_wptr = (debug_keystroke_queue_wptr + 1) % ARRAY_LENGTH(debug_keystroke_queue);
if (next_wptr == debug_keystroke_queue_rptr)
return false; /* overflow */
debug_keystroke_queue[debug_keystroke_queue_wptr] = c;
debug_keystroke_queue_wptr = next_wptr;
return true;
}
int debug_dequeue_emulated_input() {
if (debug_keystroke_queue_rptr == debug_keystroke_queue_wptr) {
return -1; /* queue empty */
}
int rc = debug_keystroke_queue[debug_keystroke_queue_rptr];
debug_keystroke_queue_rptr = (debug_keystroke_queue_rptr + 1) % ARRAY_LENGTH(debug_keystroke_queue);
return (unsigned char)rc; /* cast into range 0-255 */
}
/* Lookup ASCII char and fill in corresponding HID keycode in HID report.
*
* Returns true if char was found in keycode mapping.
*/
bool debug_fill_report_from_ascii(char c, struct hid_report *out) {
memset(out, 0, sizeof(out));
for (size_t i=0; keycode_mapping[i].kc != KEY_NONE; i++) {
struct hid_report report = {0};
if (keycode_mapping[i].ch[0] == data)
report.modifiers = 0;
else if (keycode_mapping[i].ch[1] == data)
report.modifiers = MOD_LSHIFT;
if (keycode_mapping[i].ch[0] == c)
out->modifiers = 0;
else if (keycode_mapping[i].ch[1] == c)
out->modifiers = MOD_LSHIFT;
else continue;
report.keycodes[0] = keycode_mapping[i].kc;
pairing_parse_report(&report, 8);
break;
}
LOG_PRINTF(" %02x ", data);
if (data == 0x7f) {
struct hid_report report = {.modifiers=0, .keycodes={KEY_BACKSPACE, 0}};
pairing_parse_report(&report, 8);
} else if (data == '\r') {
struct hid_report report = {.modifiers=0, .keycodes={KEY_ENTER, 0}};
pairing_parse_report(&report, 8);
LOG_PRINTF("\n");
out->keycodes[0] = keycode_mapping[i].kc;
return true;
}
struct hid_report report = {0};
if (c == 0x7f) {
out->modifiers = 0;
out->keycodes[0] = KEY_BACKSPACE;
return true;
} else if (c == '\r') {
out->modifiers = 0;
out->keycodes[0] = KEY_ENTER;
return true;
}
return false; /* not found */
}
/* Returns true on success */
bool debug_emulate_pairing_input(char c) {
struct hid_report report;
if (!debug_fill_report_from_ascii(c, &report)) {
return false;
}
/* Emulate key press input */
pairing_parse_report(&report, 8);
/* key release */
memset(&report, 0, sizeof(report));
pairing_parse_report(&report, 8);
return true;
}
/* Returns true on success */
bool debug_send_encrypted_emulated_report(char c) {
struct hid_report_packet pkt = {
.type = REPORT_KEYBOARD,
.report = {
.len = 8,
.report = {0}
}
};
/* key press */
if (!debug_fill_report_from_ascii(c, &pkt.report.report))
return false;
if (send_encrypted_message(&noise_state, (uint8_t *)&pkt, sizeof(pkt))) {
LOG_PRINTF("Error sending emulated HID report packet\n");
return false;
}
/* key release */
memset(&pkt.report.report, 0, sizeof(struct hid_report));
if (send_encrypted_message(&noise_state, (uint8_t *)&pkt, sizeof(pkt))) {
LOG_PRINTF("Error sending emulated HID report packet\n");
return false;
}
return true;
}
/* end unsafe debug code */
@ -605,25 +710,64 @@ int main(void)
int spk_inc = 1;
gpio_clear(GPIOA, GPIO6);
gpio_clear(GPIOA, GPIO7);
gpio_clear(GPIOB, GPIO10);
//gpio_clear(GPIOB, GPIO10);
int last_btn_st = 0;
int debounce_ctr = 0;
int last_btn_st_mode = 0;
int last_btn_st_test = 0;
int debounce_ctr_mode = 0;
int debounce_ctr_test = 0;
int mode_debug = 0;
#define DEBOUNCE_IVL 1000
gpio_set(GPIOE, GPIO13);
gpio_clear(GPIOE, GPIO14);
int dbg_keyem_ctr = 0;
#define DBG_KEYEM_IVL 100
while (23) {
delay(1);
if (debounce_ctr > 0) {
debounce_ctr -= 1;
if (dbg_keyem_ctr <= 0) {
int rc = debug_dequeue_emulated_input();
if (rc >= 0) {
dbg_keyem_ctr = DBG_KEYEM_IVL;
if (!debug_send_encrypted_emulated_report((char) rc)) {
LOG_PRINTF("Error sending emulated keyboard input\n");
} else {
LOG_PRINTF("Sent emulated keypress %02x\n", (unsigned char)rc);
}
}
} else {
dbg_keyem_ctr -= 1;
}
if (debounce_ctr_test > 0) {
debounce_ctr_test -= 1;
} else {
if (!gpio_get(GPIOB, GPIO10)) {
if (last_btn_st_test) {
debounce_ctr_test = DEBOUNCE_IVL;
const char *s = "unicorn\r";
for (size_t i=0; i<8; i++) {
if (!debug_enqueue_emulated_input(s[i])) {
LOG_PRINTF("Error enqueuing emulated test input\n");
}
}
}
last_btn_st_test = 0;
} else {
last_btn_st_test = 1;
}
}
if (debounce_ctr_mode > 0) {
debounce_ctr_mode -= 1;
} else {
if (gpio_get(GPIOE, GPIO15)) {
if (!last_btn_st) {
debounce_ctr = DEBOUNCE_IVL;
if (!gpio_get(GPIOE, GPIO15)) {
if (last_btn_st_mode) {
debounce_ctr_mode = DEBOUNCE_IVL;
mode_debug = !mode_debug;
@ -637,10 +781,10 @@ int main(void)
}
}
last_btn_st = 1;
last_btn_st_mode = 0;
} else {
last_btn_st = 0;
last_btn_st_mode = 1;
}
}