58 lines
1.6 KiB
Python
58 lines
1.6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from time import time
|
|
from binascii import hexlify
|
|
import enum
|
|
import struct
|
|
import zlib
|
|
import sys
|
|
|
|
import serial
|
|
from cobs import cobs
|
|
|
|
|
|
class CtrlPacketTypes(enum.Enum):
|
|
RESET = 1
|
|
ACK = 2
|
|
RETRANSMIT = 3
|
|
|
|
def unpack_head(fmt, data):
|
|
split = struct.calcsize(fmt)
|
|
return *struct.unpack(fmt, data[:split]), data[split:]
|
|
|
|
def ctrl_packet(ptype, pid=0):
|
|
return cobs.encode(struct.pack('BB', ptype.value, pid)) + b'\0'
|
|
|
|
ctrl_reset = lambda: ctrl_packet(CtrlPacketTypes.RESET)
|
|
ctrl_ack = lambda pid: ctrl_packet(CtrlPacketTypes.ACK, pid)
|
|
ctrl_retransmit = lambda pid: ctrl_packet(CtrlPacketTypes.RETRANSMIT, pid)
|
|
|
|
|
|
ser = serial.Serial('/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0', 250000, timeout=1.0)
|
|
ser.write(b'foobar'*32)
|
|
sys.exit(0)
|
|
|
|
log = []
|
|
ser.flushInput()
|
|
ser.write(ctrl_reset())
|
|
ser.flushOutput()
|
|
for _ in range(100):
|
|
#ser.write(cobs.encode(b'\x01\xff') + b'\0')
|
|
data = ser.read_until(b'\0')
|
|
if not data or data[-1] != 0x00:
|
|
#print(f'{time():>7.3f} Timeout: resetting')
|
|
#ser.write(cobs.encode(b'\x01\xff') + b'\0') # reset
|
|
continue
|
|
|
|
crc32, payload = unpack_head('I', cobs.decode(data[:-1]))
|
|
pid, seq, data = unpack_head('xBH', payload)
|
|
ser.write(ctrl_ack(pid))
|
|
ser.flushOutput()
|
|
|
|
# Calculate byte-wise CRC32
|
|
#our_crc = zlib.crc32(bytes(b for x in payload for b in (0, 0, 0, x)))
|
|
our_crc = 0
|
|
#log.append((time(), seq, crc32, our_crc, pid, data))
|
|
|
|
for time, seq, crc32, our_crc, pid, data in log:
|
|
print(f'{time:>7.3f} {seq:05d} {crc32:08x} {our_crc:08x} {pid} {hexlify(data).decode()}')
|