Scheduling WIP

This commit is contained in:
jaseg 2024-01-31 18:36:53 +01:00
parent 10ec878664
commit 7f62fa7ff1

View file

@ -1,15 +1,18 @@
#!/usr/bin/env python3
import itertools
import sqlite3
import traceback
from dataclasses import dataclass
from pathlib import Path
import time
import math
import requests
import tqdm
import click
import pyvisa
rm = pyvisa.ResourceManager()
@dataclass
class Token:
@ -71,19 +74,41 @@ class LXIWrapper:
return res.strip()
def create_schedule(off_x, off_y):
for h in [1.0, 1.5, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0]:
for rn in [range(0, 20, 2), range(20, 40, 5), range(40, 60, 10)]:
rn = list(rn)
def create_schedule():
for h in [0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0]:
for a in [0, 90, 180]:
for p, q in [(1, 1), (1, -1), (-1, 1), (-1, -1)]:
for dx in rn:
for dy in rn:
for a in angles:
yield off_x+p*dx, off_y+q*dy, 0.0, h
for a in range(0, 360, 5):
for dy in rn:
yield off_x, off_y+dy, a, h
for rn, min_c in [(range(0, 20, 2), 0), (range(0, 40, 5), 20), (range(0, 60, 10), 40)]:
rn = list(rn)
for dx in rn:
for dy in rn:
if dx >= min_c or dy >= min_c:
yield p*dx, q*dy, a, h
#for a in range(0, 360, 5):
# for dy in rn:
# yield 0, dy, a, h
def optimize_schedule(schedule):
schedule = sorted((a, h, x, y) for x, y, a, h in schedule)
for (a, h), chunk in itertools.groupby(schedule, key=lambda x: x[:2]):
xy = {(x, y) for a, h, x, y in schedule}
cur_x, cur_y = sorted(xy, key=lambda coord: math.hypot(*coord))[-1]
while True:
yield cur_x, cur_y, a, h
xy.remove((cur_x, cur_y))
if not xy:
break
keyfunc = lambda coord: math.dist((cur_x*1.1, cur_y), (coord[0]*1.1, coord[1]))
nearest = sorted(xy, key=keyfunc)
out = []
for x in nearest:
if keyfunc(x) > 1.1*keyfunc(nearest[0]):
break
out.append(x)
cur_x, cur_y = sorted(out, key=lambda coord: math.hypot(*coord))[-1]
def print_scan(ctx, param, value):
@ -112,113 +137,207 @@ def comma_range(ctx, param, value):
class Octoprint:
def __init__(self, ip):
def __init__(self, ip, api_key=None):
self.ip = ip
if api_key is None:
self.auth_headers = {}
else:
self.auth_headers = {'X-Api-Key': api_key}
requests.post(f'http://{self.ip}/api/connection', json={
'command': 'connect',
})
}, headers=self.auth_headers)
self.current_pos = (None, None, None)
def home(self):
requests.post(f'http://{self.ip}/api/printer/printhead', json={
'command': 'home',
})
'speed': 2000,
'axes': ['x', 'y', 'z'],
}, headers=self.auth_headers)
self.current_pos = (None, None, None)
time.sleep(30)
def move(self, x, y, z):
requests.post(f'http://{self.ip}/api/printer/printhead', json={
def move(self, x=None, y=None, z=None, speed_mm_per_min=2000, vertical_speed_mm_per_min=300):
last_x, last_y, last_z = self.current_pos
d = {
'command': 'jog',
'x': x,
'y': y,
'z': z,
'absolute': True,
'speed': 20,
})
'speed': speed_mm_per_min, # mm / min
}
if x is not None:
d['x'] = x
else:
x = last_x
if y is not None:
d['y'] = y
else:
y = last_x
if z is not None:
d['z'] = z
else:
z = last_z
requests.post(f'http://{self.ip}/api/printer/printhead', json=d, headers=self.auth_headers)
if z != last_z and last_z is not None:
time.sleep(abs(z - last_z) / vertical_speed_mm_per_min * 60)
elif (x, y) != (last_x, last_y) and last_x is not None and last_y is not None:
time.sleep(math.dist((x, y), (last_x, last_y)) / speed_mm_per_min * 60)
self.current_pos = (x, y, z)
class Servo:
def __init__(self, ip):
self.last_traceback = None
self.ip = ip
def set_angle(self, angle):
pass
def set_angle(self, angle, speed=65534, acc=255):
pos = round(angle/360 * 4096)
for _ in range(100):
try:
requests.get(f'http://{self.ip}/pos?pos={pos}l&speed={speed}&acc={acc}')
return None
except Exception as e:
tb = traceback.format_exception(e)
if tb != self.last_traceback:
self.last_traceback = tb
print('Error sending command to servo:')
print(tb)
else:
print('Error sending command to servo: (One more instance of the last exception)')
time.sleep(2)
@click.command()
@click.option('--scan', is_flag=True, callback=print_scan, expose_value=False, is_eager=True)
@click.option('-x', type=float, default=0, help='Tile zero X coordinate (mm)')
@click.option('-y', type=float, default=0, help='Tile zero Y coordinate (mm)')
@click.option('-x', type=float, default=360, help='Tile zero X coordinate (mm)')
@click.option('-y', type=float, default=360, help='Tile zero Y coordinate (mm)')
@click.option('--zero-angle', type=float, default=343.0, help='Servo zero angle in degree')
@click.option('--comment', help='Add comment to measurement run')
@click.option('--database', type=click.Path(dir_okay=False, path_type=Path), default='tile_measurements.sqlite3')
@click.option('--tile', type=int, required=True, help='Tile number of current measurement')
@click.option('--voltage', required=True, help='IP of voltage measurement multimeter')
@click.option('--current', required=True, help='IP of current measurement multimeter')
@click.option('--home/--no-home', default=True, help='Wheter to home the printer before starting')
@click.option('--adjust/--no-adjust', default=True, help='Prompt for interactive XYZ/R adjustment before starting')
@click.option('--tx-tile', type=int, required=True, help='Tile number of transmit (top) tile of current measurement')
@click.option('--rx-tile', type=int, required=True, help='Tile number of receive (bottom) tile of current measurement')
@click.option('--input-voltage', required=True, help='IP of input voltage measurement multimeter')
@click.option('--output-voltage', required=True, help='IP of output voltage measurement multimeter')
@click.option('--load', required=True, help='Value of load resistor in Ohms')
@click.option('--frequency', required=True, help='Stimulation frequency in Kilohertz')
@click.option('--octoprint', required=True, help='IP of octoprint instance')
@click.option('--octoprint-api-key', help='API key of octoprint instance. Optional.')
@click.option('--resume', help='Resume one or more previous runs')
@click.option('--servo', required=True, help='IP of servo control board')
def cli(voltage, current, octoprint, servo, tile, x, y, comment, database):
def cli(input_voltage, output_voltage, octoprint, octoprint_api_key, servo, rx_tile, tx_tile, x, y, zero_angle, home, adjust, load, frequency, comment, database, resume):
db = sqlite3.connect(database)
db.execute('CREATE TABLE IF NOT EXISTS runs (start_time TEXT DEFAULT CURRENT_TIMESTAMP, end_time TEXT DEFAULT NULL, run_id INTEGER PRIMARY KEY, tile INTEGER, x REAL, y REAL, comment TEXT)')
db.execute('CREATE TABLE IF NOT EXISTS measurements (timestamp TEXT DEFAULT CURRENT_TIMESTAMP, run_id INTEGER, x REAL, y REAL, r REAL, h REAL, voltage REAL, current REAL, FOREIGN KEY (run_id) REFERENCES runs)')
db.execute('CREATE TABLE IF NOT EXISTS runs (start_time TEXT DEFAULT CURRENT_TIMESTAMP, end_time TEXT DEFAULT NULL, run_id INTEGER PRIMARY KEY, rx_tile INTEGER, tx_tile INTEGER, x REAL, y REAL, load_resistor_ohms REAL, frequency_khz REAL, comment TEXT)')
db.execute('CREATE TABLE IF NOT EXISTS measurements (timestamp TEXT DEFAULT CURRENT_TIMESTAMP, run_id INTEGER, x REAL, y REAL, r REAL, h REAL, input_voltage REAL, output_voltage REAL, FOREIGN KEY (run_id) REFERENCES runs)')
octoprint = Octoprint(octoprint)
octoprint = Octoprint(octoprint, octoprint_api_key)
servo = Servo(servo)
print('Homing printer...')
octoprint.home()
if home:
octoprint.home()
octoprint.move(x=x, y=y, z=10)
print('Done.')
print()
print('Please move the printer to its home position using the arrow keys. Cycle step sizes with [w] and [s].')
step_sizes = [0.25, 1, 5, 10, 25]
step_size_index = len(step_sizes)-1
h = 3 # height for zero position adjustment
while True:
print('Command: ', end='')
match click.getchar():
case '\x1b[D': # left arrow
x -= step_sizes[step_size_index]
print('At ({x:6.2f}, {y:6.2f})')
octoprint.move(x, y, h)
case '\x1b[C': # right arrow
x += step_sizes[step_size_index]
print('At ({x:6.2f}, {y:6.2f})')
octoprint.move(x, y, h)
case '\x1b[B': # down arrow
y -= step_sizes[step_size_index]
print('At ({x:6.2f}, {y:6.2f})')
octoprint.move(x, y, h)
case '\x1b[A': # up arrow
y += step_sizes[step_size_index]
print('At ({x:6.2f}, {y:6.2f})')
octoprint.move(x, y, h)
case 'w':
step_size_index = (step_size_index+1) % len(step_sizes)
print('Step size: {step_sizes[step_size_index]}')
case 's':
step_size_index = (step_size_index-1) % len(step_sizes)
print('Step size: {step_sizes[step_size_index]}')
case '\n':
break
h = 10 # height for zero position adjustment
print('Homing servo...')
servo.set_angle(zero_angle)
print('done.')
print()
if adjust:
print('Please move the printer to its home position using the arrow keys. Cycle step sizes with [w] and [s]. Raise/lower bed with [r] (raise) / [f] (lower). Adjust zero angle with [t] and [g]. Finish with [q].')
while True:
print('Command: ', end='')
match click.getchar():
case '\x1b[D': # left arrow
x -= step_sizes[step_size_index]
print(f'At ({x:6.2f}, {y:6.2f})')
octoprint.move(x=x)
case '\x1b[C': # right arrow
x += step_sizes[step_size_index]
print(f'At ({x:6.2f}, {y:6.2f})')
octoprint.move(x=x)
case '\x1b[B': # down arrow
y -= step_sizes[step_size_index]
print(f'At ({x:6.2f}, {y:6.2f})')
octoprint.move(y=y)
case '\x1b[A': # up arrow
y += step_sizes[step_size_index]
print(f'At ({x:6.2f}, {y:6.2f})')
octoprint.move(y=y)
case 'r':
h -= step_sizes[step_size_index]
print(f'At {h=:6.2f}')
octoprint.move(z=h)
case 'f':
h += step_sizes[step_size_index]
print(f'At {h=:6.2f}')
octoprint.move(z=h)
case 't':
zero_angle += step_sizes[step_size_index]
zero_angle %= 360.0
print(f'At {zero_angle=:6.2f}')
servo.set_angle(zero_angle)
case 'g':
zero_angle -= step_sizes[step_size_index]
zero_angle %= 360.0
print(f'At {zero_angle=:6.2f}')
servo.set_angle(zero_angle)
case 'w':
step_size_index = (step_size_index+1) % len(step_sizes)
print(f'Step size: {step_sizes[step_size_index]}')
case 's':
step_size_index = (step_size_index-1) % len(step_sizes)
print(f'Step size: {step_sizes[step_size_index]}')
case 'q':
break
print()
resume_tuples = []
if resume:
for run_id in resume.split(','):
resume_tuples += db.execute('SELECT x, y, r, h FROM measurements WHERE run_id=?', (int(run_id),)).fetchall()
print(f'Resuming previous run after {len(resume_tuples)} measurements')
resume_tuples = set(resume_tuples)
cur = db.cursor()
cur.execute('INSERT INTO runs (x, y, comment) VALUES (?, ?, ?)', (x, y, comment))
cur.execute('INSERT INTO runs (x, y, tx_tile, rx_tile, load_resistor_ohms, frequency_khz, comment) VALUES (?, ?, ?, ?, ?, ?, ?)', (x, y, tx_tile, rx_tile, load, frequency, comment))
db.commit()
run_id = cur.lastrowid
mm_v = LXIWrapper(voltage)
mm_i = LXIWrapper(current)
print(f'Multimeter connected, system version: {mm.q_system_version()}')
mm_v_in = LXIWrapper(input_voltage)
mm_v_out = LXIWrapper(output_voltage)
print(f'Multimeter for input voltage connected, system version: {mm_v_in.q_system_version()}')
print(f'Multimeter for output voltage connected, system version: {mm_v_out.q_system_version()}')
print('Running measurements...')
for x, y, a, h in tqdm.tqdm(list(create_schedule(x, y)), desc=f'Run ID {run_id}'):
servo.move(a)
octoprint.move(x, y, h)
voltage = mm_v.q_measure_voltage_ac(mm_v.auto, mm_v.max)
current = mm_i.q_measure_current_ac(mm_i.auto, mm_i.max)
tqdm.write(f'Last measurement: {voltage:12.9f} V / {current:12.9f} A')
db.execute('INSERT INTO measurements (run_id, x, y, r, h, value) VALUES (?, ?, ?, ?, ?, ?, ?)', (run_id, x, y, a, h, voltage, current))
off_x, off_y = x, y
for x, y, a, h in tqdm.tqdm(optimize_schedule(create_schedule()), desc=f'Run ID {run_id}'):
if (x, y, a, h) in resume_tuples:
continue
servo.set_angle(a+zero_angle)
octoprint.move(off_x+x, off_y+y, h)
v_in = mm_v_in.q_measure_voltage_ac(mm_v_in.auto, mm_v_in.max)
v_out = mm_v_out.q_measure_voltage_ac(mm_v_out.auto, mm_v_out.max)
tqdm.tqdm.write(f'[{x=:6.2f} {y=:6.2f} {h=:6.2f} {a=:6.2f}]: {v_in=:12.9f} V / {v_out=:12.9f} V')
cur.execute('INSERT INTO measurements (run_id, x, y, r, h, input_voltage, output_voltage) VALUES (?, ?, ?, ?, ?, ?, ?)', (run_id, x, y, a, h, v_in, v_out))
db.commit()
rm = pyvisa.ResourceManager()
if __name__ == '__main__':
cli()