88 lines
2.8 KiB
Python
88 lines
2.8 KiB
Python
#!/usr/bin/env python
|
|
|
|
import sys
|
|
import subprocess
|
|
import binascii
|
|
from statistics import mean
|
|
from dataclasses import dataclass, fields
|
|
import struct
|
|
from pprint import pprint
|
|
|
|
from cobs import cobs
|
|
|
|
time = [*sys.argv, '1s'][1]
|
|
proc = subprocess.run(f'sigrok-cli --driver dreamsourcelab-dslogic --config samplerate=10M --channels 0,1 --protocol-decoders uart:baudrate=250000:rx=1 --protocol-decoder-annotations uart=rx-data,uart=tx-data --time {time}'.split(), check=True, capture_output=True, text=True)
|
|
data = [line.partition(':')[2] for line in proc.stdout.splitlines()]
|
|
data = bytes([int(x, 16) for x in data if x])
|
|
|
|
class Serialized:
|
|
@classmethod
|
|
def deserialize(kls, data):
|
|
fields = struct.unpack(kls._struct_format(), data)
|
|
mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)]
|
|
return kls(*mapped)
|
|
|
|
@classmethod
|
|
def _struct_format(kls):
|
|
return kls._parse_fields()[0]
|
|
|
|
@classmethod
|
|
def _struct_casts(kls):
|
|
return kls._parse_fields()[1]
|
|
|
|
@classmethod
|
|
def _parse_fields(kls):
|
|
fmt = '<'
|
|
casts = []
|
|
for field in fields(kls):
|
|
if isinstance(field.type, tuple):
|
|
struct_type, cast = field.type
|
|
else:
|
|
struct_type, cast = field.type, int
|
|
fmt += struct_type
|
|
casts.append(cast)
|
|
return fmt, casts
|
|
|
|
@dataclass
|
|
class Header(Serialized):
|
|
crc: 'I'
|
|
src: 'B'
|
|
dst: 'B'
|
|
pid: 'B'
|
|
packet_type: 'B'
|
|
|
|
@dataclass
|
|
class ADCPacket(Serialized):
|
|
timestamp: 'Q'
|
|
sampling_interval: 'I'
|
|
total_samples: 'I'
|
|
sample_count: 'I'
|
|
samples: ('96s', bytes)
|
|
|
|
def __post_init__(self):
|
|
data = self.samples
|
|
foo = lambda x: x if x < 0x800000 else x-0x1000000
|
|
self.samples = [[
|
|
foo(struct.unpack('<I', data[3*(2*sample + channel):][:3] + b'\0')[0])
|
|
for sample in range(16)
|
|
] for channel in range(2)]
|
|
|
|
|
|
norm_a, norm_b = 0, 0
|
|
for packet in data.split(b'\0'):
|
|
try:
|
|
packet = cobs.decode(packet)
|
|
hdr = Header.deserialize(packet[:8])
|
|
if hdr.packet_type == 2:
|
|
packet = ADCPacket.deserialize(packet[8:])
|
|
diff_a = max([abs(x - norm_a) for x in packet.samples[0][:packet.sample_count]])
|
|
diff_b = max([abs(x - norm_b) for x in packet.samples[1][:packet.sample_count]])
|
|
if diff_a > 10000 or diff_b > 10000:
|
|
pprint(packet)
|
|
norm_a = mean(packet.samples[0][:packet.sample_count])
|
|
norm_b = mean(packet.samples[1][:packet.sample_count])
|
|
elif any(x != 0 for x in packet.samples[0][packet.sample_count:] + packet.samples[1][packet.sample_count:]):
|
|
pprint('nonzero', packet)
|
|
except (cobs.DecodeError, struct.error):
|
|
print('Decoding error')
|
|
|