From 44b8924499763853d79cca54f264db5b439afa5e Mon Sep 17 00:00:00 2001 From: jaseg Date: Fri, 5 Jan 2024 19:15:40 +0100 Subject: [PATCH] Initial commit --- run_measurements.py | 210 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 run_measurements.py diff --git a/run_measurements.py b/run_measurements.py new file mode 100644 index 0000000..b516b3c --- /dev/null +++ b/run_measurements.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 + +import sqlite3 +from dataclasses import dataclass +from pathlib import Path + +import requests +import tqdm +import click +import pyvisa + +rm = pyvisa.ResourceManager() + +@dataclass +class Token: + val: str + + def __str__(self): + return self.val + +class LXIWrapper: + min = Token('MIN') + max = Token('MAX') + default = Token('DEF') + auto = Token('AUTO') + _inst = None + + def __init__(self, ip): + self._inst = rm.open_resource(f'TCPIP::{ip}::INSTR') + + def __getattr__(self, key): + if (val := self.__dict__.get(key)): + return val + elif (val := getattr(self._inst, key, None)): + return val + else: + if key.startswith('q_'): + return lambda *args: self._query(self._map_key(key[2:]), *args) + else: + return lambda *args: self._invoke(self._map_key(key), *args) + + def __setattr__(self, key, value): + if hasattr(type(self), key): + super().__setattr__(key, value) + else: + return self._invoke(self._map_key(key), value) + + @classmethod + def _mangle(kls, s): + return s[:4].upper() + s[4:].lower() + + @classmethod + def _map_key(kls, key): + return ':'.join(kls._mangle(item) if item.islower() else item for item in key.split('_')) + + @classmethod + def _format(kls, command, *args): + if args: + return command + ' ' + ','.join(f'"{x}"' if isinstance(x, str) else str(x) for x in args) + else: + return command + + def _invoke(self, command, *args): + self._inst.write(self._format(command, *args)) + + def _query(self, command, *args): + res = self._inst.query(self._format(command + '?', *args)) + try: + return float(res) + except ValueError: + return res.strip() + + +def create_schedule(off_x, off_y): + for h in [1.0, 1.5, 2.0, 3.0, 5.0, 10.0, 15.0, 20.0, 30.0]: + for rn, angles in [(range(0, 20, 2), range(0, 360, 15)), + (range(20, 40, 5), range(0, 360, 90)), + (range(40, 60, 10), range(0, 360, 90))]: + rn = list(rn) + for p, q in [(1, 1), (1, -1), (-1, 1), (-1, -1)]: + for dx in rn: + for dy in rn: + for a in angles: + yield off_x+p*dx, off_y+q*dy, a, h + + +def print_scan(ctx, param, value): + if value: + print('LXI target scan:') + l = list(rm.list_resources()) + if l: + for uri in l: + print(f' {uri}') + else: + print('(Nothing found)') + print() + + +def comma_range(ctx, param, value): + if isinstance(value, tuple): + return value + + try: + a, _, b = value.partition(',') + a, b = float(a), float(b) + return (a, b) + + except ValueError: + raise click.BadParameter('range format must be "start,end", for example: "20,50".') + + +class Octoprint: + def __init__(self, ip): + self.ip = ip + + def home(self): + pass + + def move(self, x, y, z): + pass + + +class Servo: + def __init__(self, ip): + self.ip = ip + + def set_angle(self, angle): + pass + + +@click.command() +@click.option('--scan', is_flag=True, callback=print_scan, expose_value=False, is_eager=True) +@click.option('--tile', type=int, help='Tile number of current measurement') +@click.option('-x', type=float, default=0, help='Tile zero X coordinate (mm)') +@click.option('-y', type=float, default=0, help='Tile zero Y coordinate (mm)') +@click.option('--comment', help='Add comment to measurement run') +@click.option('--database', type=click.Path(dir_okay=False, path_type=Path), default='tile_measurements.sqlite3') +@click.option('--voltage', help='IP of voltage measurement multimeter') +@click.option('--current', help='IP of current measurement multimeter') +@click.option('--octoprint', help='IP of octoprint instance') +@click.option('--servo', help='IP of servo control board') +def cli(voltage, current, octoprint, servo, tile, x, y, comment, database): + db = sqlite3.connect(database) + db.execute('CREATE TABLE IF NOT EXISTS runs (start_time TEXT DEFAULT CURRENT_TIMESTAMP, end_time TEXT DEFAULT NULL, run_id INTEGER PRIMARY KEY, tile INTEGER, x REAL, y REAL, comment TEXT)') + db.execute('CREATE TABLE IF NOT EXISTS measurements (timestamp TEXT DEFAULT CURRENT_TIMESTAMP, run_id INTEGER, x REAL, y REAL, r REAL, h REAL, voltage REAL, current REAL, FOREIGN KEY (run_id) REFERENCES runs)') + + + octoprint = Octoprint(octoprint) + servo = Servo(servo) + + print('Homing printer...') + octoprint.home() + print('Done.') + print() + + print('Please move the printer to its home position using the arrow keys. Cycle step sizes with [w] and [s].') + step_sizes = [0.25, 1, 5, 10, 25] + step_size_index = len(step_sizes)-1 + + h = 3 # height for zero position adjustment + while True: + print('Command: ', end='') + match click.getchar(): + case '\x1b[D': # left arrow + x -= step_sizes[step_size_index] + print('At ({x:6.2f}, {y:6.2f})') + octoprint.move(x, y, h) + case '\x1b[C': # right arrow + x += step_sizes[step_size_index] + print('At ({x:6.2f}, {y:6.2f})') + octoprint.move(x, y, h) + case '\x1b[B': # down arrow + y -= step_sizes[step_size_index] + print('At ({x:6.2f}, {y:6.2f})') + octoprint.move(x, y, h) + case '\x1b[A': # up arrow + y += step_sizes[step_size_index] + print('At ({x:6.2f}, {y:6.2f})') + octoprint.move(x, y, h) + case 'w': + step_size_index = (step_size_index+1) % len(step_sizes) + print('Step size: {step_sizes[step_size_index]}') + case 's': + step_size_index = (step_size_index-1) % len(step_sizes) + print('Step size: {step_sizes[step_size_index]}') + case '\n': + break + print() + + cur = db.cursor() + cur.execute('INSERT INTO runs (x, y, comment) VALUES (?, ?, ?)', (x, y, comment)) + run_id = cur.lastrowid + + mm_v = LXIWrapper(voltage) + mm_i = LXIWrapper(current) + print(f'Multimeter connected, system version: {mm.q_system_version()}') + print('Running measurements...') + + for x, y, a, h in tqdm.tqdm(list(create_schedule(x, y)), desc=f'Run ID {run_id}'): + servo.move(a) + octoprint.move(x, y, h) + voltage = mm_v.q_measure_voltage_ac(mm_v.auto, mm_v.max) + current = mm_i.q_measure_current_ac(mm_i.auto, mm_i.max) + tqdm.write(f'Last measurement: {voltage:12.9f} V / {current:12.9f} A') + db.execute('INSERT INTO measurements (run_id, x, y, r, h, value) VALUES (?, ?, ?, ?, ?, ?, ?)', (run_id, x, y, a, h, voltage, current)) + + +if __name__ == '__main__': + cli() +