Handshake working with new abstractions
This commit is contained in:
parent
42d4bebde7
commit
2f4f3e13aa
11 changed files with 1118 additions and 77 deletions
36
hexnoise.py
36
hexnoise.py
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import time
|
||||
import string
|
||||
import enum
|
||||
|
||||
from cobs import cobs
|
||||
|
||||
|
|
@ -28,9 +29,22 @@ def hexdump(write, packet, width=16):
|
|||
_print_line(write, ts-startup, packet, width=width)
|
||||
write()
|
||||
|
||||
def send_packet(ser, data, width=16):
|
||||
print(f'\033[93mSending {len(data)} bytes\033[0m')
|
||||
class PacketType(enum.Enum):
|
||||
_RESERVED = 0
|
||||
INITIATE_HANDSHAKE = 1
|
||||
HANDSHAKE = 2
|
||||
DATA = 3
|
||||
|
||||
class ReportType(enum.Enum):
|
||||
_RESERVED = 0
|
||||
KEYBOARD = 1
|
||||
MOUSE = 2
|
||||
PAIRING = 3 # keyboard in disguise
|
||||
|
||||
def send_packet(ser, pkt_type, data, width=16):
|
||||
print(f'\033[93mSending {len(data)} bytes, packet type {pkt_type.name} ({pkt_type.value})\033[0m')
|
||||
hexdump(print, data, width)
|
||||
data = bytes([pkt_type.value]) + data
|
||||
encoded = cobs.encode(data) + b'\0'
|
||||
ser.write(encoded)
|
||||
ser.flushOutput()
|
||||
|
|
@ -40,7 +54,7 @@ def receive_packet(ser, width=16):
|
|||
data = cobs.decode(packet[:-1])
|
||||
print(f'\033[93mReceived {len(data)} bytes\033[0m')
|
||||
hexdump(print, data, width)
|
||||
return data
|
||||
return data[0], data[1:]
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
|
|
@ -85,16 +99,21 @@ if __name__ == '__main__':
|
|||
proto.set_as_initiator()
|
||||
proto.set_keypair_from_private_bytes(Keypair.STATIC, STATIC_LOCAL)
|
||||
proto.start_handshake()
|
||||
send_packet(ser, PacketType.INITIATE_HANDSHAKE, b'', args.width)
|
||||
print('Handshake started')
|
||||
|
||||
while True:
|
||||
if proto.handshake_finished:
|
||||
break
|
||||
send_packet(ser, proto.write_message(), args.width)
|
||||
send_packet(ser, PacketType.HANDSHAKE, proto.write_message(), args.width)
|
||||
|
||||
if proto.handshake_finished:
|
||||
break
|
||||
proto.read_message(receive_packet(ser, args.width))
|
||||
pkt_type, payload = receive_packet(ser, args.width)
|
||||
if pkt_type == PacketType.HANDSHAKE.value:
|
||||
proto.read_message(payload)
|
||||
else:
|
||||
print(f'Incorrect packet type {pkt_type}. Ignoring since this is only test code.')
|
||||
print('Handshake finished, handshake hash:')
|
||||
hexdump(print, proto.get_handshake_hash(), args.width)
|
||||
|
||||
|
|
@ -126,7 +145,11 @@ if __name__ == '__main__':
|
|||
with uinput.Device(ALL_KEYS) as ui:
|
||||
while True:
|
||||
try:
|
||||
received = receive_packet(ser, args.width)
|
||||
pkt_type, received = receive_packet(ser, args.width)
|
||||
if pkt_type != PacketType.DATA.value:
|
||||
print(f'Unexpected packet type {pkt_type}. Ignoring.')
|
||||
continue
|
||||
|
||||
try:
|
||||
noise_rx(received, ui)
|
||||
except NoiseInvalidMessage as e:
|
||||
|
|
@ -145,3 +168,4 @@ if __name__ == '__main__':
|
|||
proto.noise_protocol.cipher_state_decrypt.n = orig_n
|
||||
except Exception as e:
|
||||
print('Invalid framing:', e)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue