454 lines
18 KiB
Python
454 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import itertools
|
|
import sqlite3
|
|
import traceback
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import time
|
|
import math
|
|
|
|
import requests
|
|
import tqdm
|
|
import click
|
|
import pyvisa
|
|
|
|
|
|
@dataclass
|
|
class Token:
|
|
val: str
|
|
|
|
def __str__(self):
|
|
return self.val
|
|
|
|
class LXIWrapper:
|
|
min = Token('MIN')
|
|
max = Token('MAX')
|
|
default = Token('DEF')
|
|
auto = Token('AUTO')
|
|
_inst = None
|
|
|
|
def __init__(self, ip):
|
|
self._inst = rm.open_resource(f'TCPIP::{ip}::INSTR')
|
|
|
|
def __getattr__(self, key):
|
|
if (val := self.__dict__.get(key)):
|
|
return val
|
|
elif (val := getattr(self._inst, key, None)):
|
|
return val
|
|
else:
|
|
if key.startswith('q_'):
|
|
return lambda *args: self._query(self._map_key(key[2:]), *args)
|
|
else:
|
|
return lambda *args: self._invoke(self._map_key(key), *args)
|
|
|
|
def __setattr__(self, key, value):
|
|
if hasattr(type(self), key):
|
|
super().__setattr__(key, value)
|
|
else:
|
|
return self._invoke(self._map_key(key), value)
|
|
|
|
@classmethod
|
|
def _mangle(kls, s):
|
|
return s[:4].upper() + s[4:].lower()
|
|
|
|
@classmethod
|
|
def _map_key(kls, key):
|
|
return ':'.join(kls._mangle(item) if item.islower() else item for item in key.split('_'))
|
|
|
|
@classmethod
|
|
def _format(kls, command, *args):
|
|
if args:
|
|
return command + ' ' + ','.join(f'"{x}"' if isinstance(x, str) else str(x) for x in args)
|
|
else:
|
|
return command
|
|
|
|
def _invoke(self, command, *args):
|
|
self._inst.write(self._format(command, *args))
|
|
|
|
def _query(self, command, *args):
|
|
res = self._inst.query(self._format(command + '?', *args))
|
|
try:
|
|
return float(res)
|
|
except ValueError:
|
|
return res.strip()
|
|
|
|
|
|
def create_schedule():
|
|
for h in [0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0]:
|
|
for a in [0, 90, 180]:
|
|
for p, q in [(1, 1), (1, -1), (-1, 1), (-1, -1)]:
|
|
for rn, min_c in [(range(0, 20, 2), 0), (range(0, 40, 5), 20), (range(0, 60, 10), 40)]:
|
|
rn = list(rn)
|
|
for dx in rn:
|
|
for dy in rn:
|
|
if dx >= min_c or dy >= min_c:
|
|
yield p*dx, q*dy, a, h
|
|
|
|
#for a in range(0, 360, 5):
|
|
# for dy in rn:
|
|
# yield 0, dy, a, h
|
|
|
|
|
|
def create_fine_schedule(na1=32, na2=8):
|
|
# grids
|
|
for h in [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, 20.0]:
|
|
for a in [0, 15, 30, 45]:
|
|
for rn, min_c in [(range(-20, 21, 2), 0), (range(-50, 50, 5), 21)]:
|
|
rn = list(rn)
|
|
for dx in rn:
|
|
for dy in rn:
|
|
if abs(dx) >= min_c or abs(dy) >= min_c:
|
|
yield dx, dy, a, h
|
|
|
|
# radial schedule
|
|
for h in [0.0, 1.0, 2.0, 5.0]:
|
|
for a2 in range(na2):
|
|
a2_rad = 2*math.pi/na2 * a2
|
|
yield 0, 0, math.degrees(a2_rad), h
|
|
|
|
for r in [1.0, 2.0, 3.0, 5.0]:
|
|
for a2 in range(na2):
|
|
for a1 in range(na1):
|
|
a1_rad = 2*math.pi/na1 * a1
|
|
a2_rad = 2*math.pi/na2 * a2
|
|
x, y = -r*math.sin(a1_rad), r*math.cos(a1_rad)
|
|
yield x, y, math.degrees(a2_rad+a1_rad), h
|
|
|
|
def create_rotation_schedule(na=36, h=1):
|
|
for a in range(na):
|
|
a_rad = 2*math.pi/na*a
|
|
|
|
for x in range(-30, 31):
|
|
yield x, 0, math.degrees(a_rad), h
|
|
|
|
for y in range(-30, 31):
|
|
yield 0, y, math.degrees(a_rad), h
|
|
|
|
def create_displaced_rotation_schedule(na=72, h=1):
|
|
for a in range(na):
|
|
a_rad = 2*math.pi/na*a
|
|
|
|
yield 0, 0, math.degrees(a_rad), h
|
|
|
|
for r in range(1, 5):
|
|
for a2 in range(na):
|
|
a2_rad = 2*math.pi/na*a2
|
|
x, y = -r*math.sin(a2_rad), r*math.cos(a2_rad)
|
|
yield x, y, math.degrees(a_rad), h
|
|
|
|
|
|
def create_radial_schedule(na1=8, na2=40):
|
|
for h in [0.0, 1.0, 2.0, 5.0]:
|
|
for r in [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 12.0]:
|
|
for a1 in range(na1):
|
|
for a2 in range(na2):
|
|
a1_rad = 2*math.pi/na1 * a1
|
|
a2_rad = 2*math.pi/na2 * a2
|
|
x, y = r*math.sin(a1_rad), r*math.cos(a1_rad)
|
|
yield x, y, math.degrees(a2_rad+a1_rad), h
|
|
|
|
|
|
def optimize_schedule(schedule):
|
|
schedule = sorted((a, h, x, y) for x, y, a, h in schedule)
|
|
for (a, h), chunk in itertools.groupby(schedule, key=lambda x: x[:2]):
|
|
xy = {(x, y) for a, h, x, y in schedule}
|
|
cur_x, cur_y = sorted(xy, key=lambda coord: math.hypot(*coord))[-1]
|
|
while True:
|
|
yield cur_x, cur_y, a, h
|
|
xy.remove((cur_x, cur_y))
|
|
if not xy:
|
|
break
|
|
|
|
keyfunc = lambda coord: math.dist((cur_x*1.1, cur_y), (coord[0]*1.1, coord[1]))
|
|
nearest = sorted(xy, key=keyfunc)
|
|
out = []
|
|
for x in nearest:
|
|
if keyfunc(x) > 1.1*keyfunc(nearest[0]):
|
|
break
|
|
out.append(x)
|
|
cur_x, cur_y = sorted(out, key=lambda coord: math.hypot(*coord))[-1]
|
|
|
|
|
|
def print_scan(ctx, param, value):
|
|
if value:
|
|
print('LXI target scan:')
|
|
l = list(rm.list_resources())
|
|
if l:
|
|
for uri in l:
|
|
print(f' {uri}')
|
|
else:
|
|
print('(Nothing found)')
|
|
print()
|
|
|
|
|
|
def comma_range(ctx, param, value):
|
|
if isinstance(value, tuple):
|
|
return value
|
|
|
|
try:
|
|
a, _, b = value.partition(',')
|
|
a, b = float(a), float(b)
|
|
return (a, b)
|
|
|
|
except ValueError:
|
|
raise click.BadParameter('range format must be "start,end", for example: "20,50".')
|
|
|
|
|
|
class Octoprint:
|
|
def __init__(self, ip, api_key=None, safety_height=0):
|
|
self.ip = ip
|
|
if api_key is None:
|
|
self.auth_headers = {}
|
|
else:
|
|
self.auth_headers = {'X-Api-Key': api_key}
|
|
requests.post(f'http://{self.ip}/api/connection', json={
|
|
'command': 'connect',
|
|
}, headers=self.auth_headers)
|
|
self.safety_height = safety_height
|
|
self.current_pos = (None, None, None)
|
|
|
|
def home(self):
|
|
requests.post(f'http://{self.ip}/api/printer/printhead', headers=self.auth_headers, json={
|
|
'command': 'jog',
|
|
'absolute': False,
|
|
'speed': 2000, # mm / min
|
|
'z': 10,
|
|
})
|
|
time.sleep(2)
|
|
requests.post(f'http://{self.ip}/api/printer/printhead', json={
|
|
'command': 'home',
|
|
'speed': 2000,
|
|
'axes': ['x', 'y', 'z'],
|
|
}, headers=self.auth_headers)
|
|
self.current_pos = (None, None, None)
|
|
time.sleep(30)
|
|
|
|
def move(self, x=None, y=None, z=None, speed_mm_per_min=1000, vertical_speed_mm_per_min=300, delay=True):
|
|
last_x, last_y, last_z = self.current_pos
|
|
if last_x is not None and last_y is not None and last_z is not None and\
|
|
self.safety_height > 0 and (z is None or z == last_z) and last_z < self.safety_height and\
|
|
math.dist((last_x, last_y), (x or last_x, y or last_y)) > 0.5:
|
|
self._move(None, None, last_z + self.safety_height, speed_mm_per_min, vertical_speed_mm_per_min, delay=False)
|
|
self._move(x, y, None, speed_mm_per_min, vertical_speed_mm_per_min, delay=delay)
|
|
self._move(None, None, last_z, speed_mm_per_min, vertical_speed_mm_per_min, delay=delay)
|
|
return
|
|
else:
|
|
self._move(x, y, z, speed_mm_per_min, vertical_speed_mm_per_min, delay=delay)
|
|
|
|
def _move(self, x=None, y=None, z=None, speed_mm_per_min=2000, vertical_speed_mm_per_min=300, delay=True):
|
|
last_x, last_y, last_z = self.current_pos
|
|
d = {
|
|
'command': 'jog',
|
|
'absolute': True,
|
|
'speed': speed_mm_per_min, # mm / min
|
|
}
|
|
|
|
if x is not None:
|
|
d['x'] = x
|
|
else:
|
|
x = last_x
|
|
|
|
if y is not None:
|
|
d['y'] = y
|
|
else:
|
|
y = last_y
|
|
|
|
if z is not None:
|
|
d['z'] = z
|
|
else:
|
|
z = last_z
|
|
|
|
requests.post(f'http://{self.ip}/api/printer/printhead', json=d, headers=self.auth_headers)
|
|
if delay:
|
|
if z != last_z and last_z is not None:
|
|
t = 0.2 + abs(z - last_z) / vertical_speed_mm_per_min * 60
|
|
time.sleep(t)
|
|
elif (x, y) != (last_x, last_y) and last_x is not None and last_y is not None:
|
|
d = math.dist((x, y), (last_x, last_y))
|
|
t = 0.2 + d / speed_mm_per_min * 60
|
|
time.sleep(t)
|
|
self.current_pos = (x, y, z)
|
|
|
|
|
|
class Servo:
|
|
def __init__(self, ip):
|
|
self.last_traceback = None
|
|
self.ip = ip
|
|
self.last_angle = None
|
|
|
|
def set_angle(self, angle, speed=65534, acc=255):
|
|
pos = round(angle/360 * 4096) % 4096
|
|
for _ in range(100):
|
|
try:
|
|
requests.get(f'http://{self.ip}/pos?pos={pos}l&speed={speed}&acc={acc}')
|
|
if self.last_angle != pos:
|
|
if self.last_angle is None:
|
|
time.sleep(2)
|
|
else:
|
|
time.sleep(abs(self.last_angle - pos)/4096 * 2)
|
|
self.last_angle = pos
|
|
return
|
|
|
|
except Exception as e:
|
|
tb = traceback.format_exception(e)
|
|
if tb != self.last_traceback:
|
|
self.last_traceback = tb
|
|
print('Error sending command to servo:')
|
|
print(tb)
|
|
else:
|
|
print('Error sending command to servo: (One more instance of the last exception)')
|
|
time.sleep(2)
|
|
|
|
|
|
@click.command()
|
|
@click.option('--scan', is_flag=True, callback=print_scan, expose_value=False, is_eager=True)
|
|
@click.option('-x', type=float, default=360, help='Tile zero X coordinate (mm)')
|
|
@click.option('-y', type=float, default=360, help='Tile zero Y coordinate (mm)')
|
|
@click.option('--zero-angle', type=float, default=343.0, help='Servo zero angle in degree')
|
|
@click.option('--zero-height', type=float, default=0.0, help='Servo zero height offset in mm. Positive values increase distance.')
|
|
@click.option('--safety-height', type=float, default=0.0, help='Height to move longer distances at.')
|
|
@click.option('--comment', help='Add comment to measurement run')
|
|
@click.option('--database', type=click.Path(dir_okay=False, path_type=Path), default='tile_measurements.sqlite3')
|
|
@click.option('--home/--no-home', default=True, help='Wheter to home the printer before starting')
|
|
@click.option('--adjust/--no-adjust', default=True, help='Prompt for interactive XYZ/R adjustment before starting')
|
|
@click.option('--tx-tile', type=int, required=True, help='Tile number of transmit (top) tile of current measurement')
|
|
@click.option('--rx-tile', type=int, required=True, help='Tile number of receive (bottom) tile of current measurement')
|
|
@click.option('--input-voltage', required=True, help='IP of input voltage measurement multimeter')
|
|
@click.option('--output-voltage', required=True, help='IP of output voltage measurement multimeter')
|
|
@click.option('--load', required=True, help='Value of load resistor in Ohms')
|
|
@click.option('--octoprint', required=True, help='IP of octoprint instance')
|
|
@click.option('--schedule', default='grid', type=click.Choice(['grid', 'radial', 'fine', 'rotation', 'displaced_rotation']), help='Schedule type to use')
|
|
@click.option('--octoprint-api-key', help='API key of octoprint instance. Optional.')
|
|
@click.option('--resume', help='Resume one or more previous runs')
|
|
@click.option('--servo', required=True, help='IP of servo control board')
|
|
def cli(input_voltage, output_voltage, octoprint, octoprint_api_key, servo, rx_tile, tx_tile, x, y, zero_angle, zero_height, home, adjust, load, comment, database, resume, schedule, safety_height):
|
|
db = sqlite3.connect(database)
|
|
db.execute('CREATE TABLE IF NOT EXISTS runs (start_time TEXT DEFAULT CURRENT_TIMESTAMP, end_time TEXT DEFAULT NULL, run_id INTEGER PRIMARY KEY, rx_tile INTEGER, tx_tile INTEGER, x REAL, y REAL, load_resistor_ohms REAL, frequency_khz REAL, comment TEXT, schedule_type TEXT)')
|
|
db.execute('CREATE TABLE IF NOT EXISTS measurements (timestamp TEXT DEFAULT CURRENT_TIMESTAMP, run_id INTEGER, x REAL, y REAL, r REAL, h REAL, input_voltage REAL, output_voltage REAL, FOREIGN KEY (run_id) REFERENCES runs)')
|
|
|
|
|
|
octoprint = Octoprint(octoprint, octoprint_api_key, safety_height)
|
|
servo = Servo(servo)
|
|
|
|
print('Homing printer...')
|
|
if home:
|
|
octoprint.home()
|
|
octoprint.move(x=x, y=y, z=10)
|
|
print('Done.')
|
|
print()
|
|
|
|
step_sizes = [0.25, 1, 5, 10, 25]
|
|
step_size_index = len(step_sizes)-1
|
|
|
|
h = 10 # height for zero position adjustment
|
|
print('Homing servo...')
|
|
servo.set_angle(zero_angle)
|
|
print('done.')
|
|
print()
|
|
|
|
if adjust:
|
|
print('Please move the printer to its home position using the arrow keys. Cycle step sizes with [w] and [s]. Raise/lower bed with [r] (raise) / [f] (lower). Adjust zero angle with [t] and [g]. Finish with [q].')
|
|
while True:
|
|
print('Command: ', end='')
|
|
match click.getchar():
|
|
case '\x1b[D': # left arrow
|
|
x -= step_sizes[step_size_index]
|
|
print(f'At ({x:6.2f}, {y:6.2f})')
|
|
octoprint.move(x=x, delay=False)
|
|
case '\x1b[C': # right arrow
|
|
x += step_sizes[step_size_index]
|
|
print(f'At ({x:6.2f}, {y:6.2f})')
|
|
octoprint.move(x=x, delay=False)
|
|
case '\x1b[B': # down arrow
|
|
y -= step_sizes[step_size_index]
|
|
print(f'At ({x:6.2f}, {y:6.2f})')
|
|
octoprint.move(y=y, delay=False)
|
|
case '\x1b[A': # up arrow
|
|
y += step_sizes[step_size_index]
|
|
print(f'At ({x:6.2f}, {y:6.2f})')
|
|
octoprint.move(y=y, delay=False)
|
|
case 'r':
|
|
h -= step_sizes[step_size_index]
|
|
print(f'At {h=:6.2f}')
|
|
octoprint.move(z=h, delay=False)
|
|
case 'f':
|
|
h += step_sizes[step_size_index]
|
|
print(f'At {h=:6.2f}')
|
|
octoprint.move(z=h, delay=False)
|
|
case 't':
|
|
zero_angle += step_sizes[step_size_index]
|
|
zero_angle %= 360.0
|
|
print(f'At {zero_angle=:6.2f}')
|
|
servo.set_angle(zero_angle)
|
|
case 'g':
|
|
zero_angle -= step_sizes[step_size_index]
|
|
zero_angle %= 360.0
|
|
print(f'At {zero_angle=:6.2f}')
|
|
servo.set_angle(zero_angle)
|
|
case 'w':
|
|
step_size_index = (step_size_index+1) % len(step_sizes)
|
|
print(f'Step size: {step_sizes[step_size_index]}')
|
|
case 's':
|
|
step_size_index = (step_size_index-1) % len(step_sizes)
|
|
print(f'Step size: {step_sizes[step_size_index]}')
|
|
case 'q':
|
|
break
|
|
print()
|
|
|
|
resume_tuples = []
|
|
if resume:
|
|
for run_id in resume.split(','):
|
|
resume_tuples += db.execute('SELECT x, y, r, h FROM measurements WHERE run_id=?', (int(run_id),)).fetchall()
|
|
print(f'Resuming previous run after {len(resume_tuples)} measurements')
|
|
resume_tuples = set(resume_tuples)
|
|
|
|
mm_v_in = LXIWrapper(input_voltage)
|
|
mm_v_out = LXIWrapper(output_voltage)
|
|
frequency = mm_v_in.q_measure_frequency() / 1e3
|
|
print(f'Input frequency is {frequency:.3f} kHz')
|
|
|
|
cur = db.cursor()
|
|
cur.execute('INSERT INTO runs (x, y, tx_tile, rx_tile, load_resistor_ohms, frequency_khz, schedule_type, comment) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', (x, y, tx_tile, rx_tile, load, frequency, schedule, comment))
|
|
db.commit()
|
|
run_id = cur.lastrowid
|
|
|
|
print(f'Multimeter for input voltage connected, system version: {mm_v_in.q_system_version()}')
|
|
print(f'Multimeter for output voltage connected, system version: {mm_v_out.q_system_version()}')
|
|
print('Running measurements...')
|
|
|
|
if schedule == 'grid':
|
|
schedule = create_schedule()
|
|
elif schedule == 'radial':
|
|
schedule = create_radial_schedule()
|
|
elif schedule == 'fine':
|
|
schedule = create_fine_schedule()
|
|
elif schedule == 'rotation':
|
|
schedule = create_rotation_schedule()
|
|
elif schedule == 'displaced_rotation':
|
|
schedule = create_displaced_rotation_schedule()
|
|
else:
|
|
raise ValueError()
|
|
|
|
schedule = [foo for foo in schedule if foo not in resume_tuples]
|
|
off_x, off_y = x, y
|
|
for x, y, a, h in tqdm.tqdm(list(schedule), desc=f'Run ID {run_id}'):
|
|
servo.set_angle(a+zero_angle)
|
|
octoprint.move(off_x+x, off_y+y, h+zero_height)
|
|
v_in = mm_v_in.q_measure_voltage_ac(mm_v_in.auto, mm_v_in.max)
|
|
v_out = mm_v_out.q_measure_voltage_ac(mm_v_out.auto, mm_v_out.max)
|
|
tqdm.tqdm.write(f'[{x=:6.2f} {y=:6.2f} {h=:6.2f} {a=:6.2f}]: {v_in=:12.9f} V / {v_out=:12.9f} V')
|
|
cur.execute('INSERT INTO measurements (run_id, x, y, r, h, input_voltage, output_voltage) VALUES (?, ?, ?, ?, ?, ?, ?)', (run_id, x, y, a, h, v_in, v_out))
|
|
db.commit()
|
|
|
|
cur = db.cursor()
|
|
cur.execute('UPDATE runs SET end_time=DATETIME("now") WHERE run_id=?', (run_id,))
|
|
db.commit()
|
|
|
|
rm = pyvisa.ResourceManager()
|
|
|
|
rm = pyvisa.ResourceManager()
|
|
|
|
if __name__ == '__main__':
|
|
cli()
|
|
|