Initial commit
This commit is contained in:
commit
44b8924499
1 changed files with 210 additions and 0 deletions
210
run_measurements.py
Normal file
210
run_measurements.py
Normal file
|
|
@ -0,0 +1,210 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import sqlite3
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
import tqdm
|
||||
import click
|
||||
import pyvisa
|
||||
|
||||
rm = pyvisa.ResourceManager()
|
||||
|
||||
@dataclass
|
||||
class Token:
|
||||
val: str
|
||||
|
||||
def __str__(self):
|
||||
return self.val
|
||||
|
||||
class LXIWrapper:
|
||||
min = Token('MIN')
|
||||
max = Token('MAX')
|
||||
default = Token('DEF')
|
||||
auto = Token('AUTO')
|
||||
_inst = None
|
||||
|
||||
def __init__(self, ip):
|
||||
self._inst = rm.open_resource(f'TCPIP::{ip}::INSTR')
|
||||
|
||||
def __getattr__(self, key):
|
||||
if (val := self.__dict__.get(key)):
|
||||
return val
|
||||
elif (val := getattr(self._inst, key, None)):
|
||||
return val
|
||||
else:
|
||||
if key.startswith('q_'):
|
||||
return lambda *args: self._query(self._map_key(key[2:]), *args)
|
||||
else:
|
||||
return lambda *args: self._invoke(self._map_key(key), *args)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if hasattr(type(self), key):
|
||||
super().__setattr__(key, value)
|
||||
else:
|
||||
return self._invoke(self._map_key(key), value)
|
||||
|
||||
@classmethod
|
||||
def _mangle(kls, s):
|
||||
return s[:4].upper() + s[4:].lower()
|
||||
|
||||
@classmethod
|
||||
def _map_key(kls, key):
|
||||
return ':'.join(kls._mangle(item) if item.islower() else item for item in key.split('_'))
|
||||
|
||||
@classmethod
|
||||
def _format(kls, command, *args):
|
||||
if args:
|
||||
return command + ' ' + ','.join(f'"{x}"' if isinstance(x, str) else str(x) for x in args)
|
||||
else:
|
||||
return command
|
||||
|
||||
def _invoke(self, command, *args):
|
||||
self._inst.write(self._format(command, *args))
|
||||
|
||||
def _query(self, command, *args):
|
||||
res = self._inst.query(self._format(command + '?', *args))
|
||||
try:
|
||||
return float(res)
|
||||
except ValueError:
|
||||
return res.strip()
|
||||
|
||||
|
||||
def create_schedule(off_x, off_y):
|
||||
for h in [1.0, 1.5, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0]:
|
||||
for rn, angles in [(range(0, 20, 2), range(0, 360, 15)),
|
||||
(range(20, 40, 5), range(0, 360, 90)),
|
||||
(range(40, 60, 10), range(0, 360, 90))]:
|
||||
rn = list(rn)
|
||||
for p, q in [(1, 1), (1, -1), (-1, 1), (-1, -1)]:
|
||||
for dx in rn:
|
||||
for dy in rn:
|
||||
for a in angles:
|
||||
yield off_x+p*dx, off_y+q*dy, a, h
|
||||
|
||||
|
||||
def print_scan(ctx, param, value):
|
||||
if value:
|
||||
print('LXI target scan:')
|
||||
l = list(rm.list_resources())
|
||||
if l:
|
||||
for uri in l:
|
||||
print(f' {uri}')
|
||||
else:
|
||||
print('(Nothing found)')
|
||||
print()
|
||||
|
||||
|
||||
def comma_range(ctx, param, value):
|
||||
if isinstance(value, tuple):
|
||||
return value
|
||||
|
||||
try:
|
||||
a, _, b = value.partition(',')
|
||||
a, b = float(a), float(b)
|
||||
return (a, b)
|
||||
|
||||
except ValueError:
|
||||
raise click.BadParameter('range format must be "start,end", for example: "20,50".')
|
||||
|
||||
|
||||
class Octoprint:
|
||||
def __init__(self, ip):
|
||||
self.ip = ip
|
||||
|
||||
def home(self):
|
||||
pass
|
||||
|
||||
def move(self, x, y, z):
|
||||
pass
|
||||
|
||||
|
||||
class Servo:
|
||||
def __init__(self, ip):
|
||||
self.ip = ip
|
||||
|
||||
def set_angle(self, angle):
|
||||
pass
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option('--scan', is_flag=True, callback=print_scan, expose_value=False, is_eager=True)
|
||||
@click.option('--tile', type=int, help='Tile number of current measurement')
|
||||
@click.option('-x', type=float, default=0, help='Tile zero X coordinate (mm)')
|
||||
@click.option('-y', type=float, default=0, help='Tile zero Y coordinate (mm)')
|
||||
@click.option('--comment', help='Add comment to measurement run')
|
||||
@click.option('--database', type=click.Path(dir_okay=False, path_type=Path), default='tile_measurements.sqlite3')
|
||||
@click.option('--voltage', help='IP of voltage measurement multimeter')
|
||||
@click.option('--current', help='IP of current measurement multimeter')
|
||||
@click.option('--octoprint', help='IP of octoprint instance')
|
||||
@click.option('--servo', help='IP of servo control board')
|
||||
def cli(voltage, current, octoprint, servo, tile, x, y, comment, database):
|
||||
db = sqlite3.connect(database)
|
||||
db.execute('CREATE TABLE IF NOT EXISTS runs (start_time TEXT DEFAULT CURRENT_TIMESTAMP, end_time TEXT DEFAULT NULL, run_id INTEGER PRIMARY KEY, tile INTEGER, x REAL, y REAL, comment TEXT)')
|
||||
db.execute('CREATE TABLE IF NOT EXISTS measurements (timestamp TEXT DEFAULT CURRENT_TIMESTAMP, run_id INTEGER, x REAL, y REAL, r REAL, h REAL, voltage REAL, current REAL, FOREIGN KEY (run_id) REFERENCES runs)')
|
||||
|
||||
|
||||
octoprint = Octoprint(octoprint)
|
||||
servo = Servo(servo)
|
||||
|
||||
print('Homing printer...')
|
||||
octoprint.home()
|
||||
print('Done.')
|
||||
print()
|
||||
|
||||
print('Please move the printer to its home position using the arrow keys. Cycle step sizes with [w] and [s].')
|
||||
step_sizes = [0.25, 1, 5, 10, 25]
|
||||
step_size_index = len(step_sizes)-1
|
||||
|
||||
h = 3 # height for zero position adjustment
|
||||
while True:
|
||||
print('Command: ', end='')
|
||||
match click.getchar():
|
||||
case '\x1b[D': # left arrow
|
||||
x -= step_sizes[step_size_index]
|
||||
print('At ({x:6.2f}, {y:6.2f})')
|
||||
octoprint.move(x, y, h)
|
||||
case '\x1b[C': # right arrow
|
||||
x += step_sizes[step_size_index]
|
||||
print('At ({x:6.2f}, {y:6.2f})')
|
||||
octoprint.move(x, y, h)
|
||||
case '\x1b[B': # down arrow
|
||||
y -= step_sizes[step_size_index]
|
||||
print('At ({x:6.2f}, {y:6.2f})')
|
||||
octoprint.move(x, y, h)
|
||||
case '\x1b[A': # up arrow
|
||||
y += step_sizes[step_size_index]
|
||||
print('At ({x:6.2f}, {y:6.2f})')
|
||||
octoprint.move(x, y, h)
|
||||
case 'w':
|
||||
step_size_index = (step_size_index+1) % len(step_sizes)
|
||||
print('Step size: {step_sizes[step_size_index]}')
|
||||
case 's':
|
||||
step_size_index = (step_size_index-1) % len(step_sizes)
|
||||
print('Step size: {step_sizes[step_size_index]}')
|
||||
case '\n':
|
||||
break
|
||||
print()
|
||||
|
||||
cur = db.cursor()
|
||||
cur.execute('INSERT INTO runs (x, y, comment) VALUES (?, ?, ?)', (x, y, comment))
|
||||
run_id = cur.lastrowid
|
||||
|
||||
mm_v = LXIWrapper(voltage)
|
||||
mm_i = LXIWrapper(current)
|
||||
print(f'Multimeter connected, system version: {mm.q_system_version()}')
|
||||
print('Running measurements...')
|
||||
|
||||
for x, y, a, h in tqdm.tqdm(list(create_schedule(x, y)), desc=f'Run ID {run_id}'):
|
||||
servo.move(a)
|
||||
octoprint.move(x, y, h)
|
||||
voltage = mm_v.q_measure_voltage_ac(mm_v.auto, mm_v.max)
|
||||
current = mm_i.q_measure_current_ac(mm_i.auto, mm_i.max)
|
||||
tqdm.write(f'Last measurement: {voltage:12.9f} V / {current:12.9f} A')
|
||||
db.execute('INSERT INTO measurements (run_id, x, y, r, h, value) VALUES (?, ?, ?, ?, ?, ?, ?)', (run_id, x, y, a, h, voltage, current))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cli()
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue