#!/usr/bin/env python3 import threading import datetime import queue import itertools import pathlib import tempfile import secrets import sys import uuid import sqlite3 import time import math import json import subprocess import re import tqdm import click from tabulate import tabulate def mesh_args(db, coil_id, mesh_type, mesh_file, outfile, **kwargs): mesh_type = {'split': '--mesh-split-out', 'normal': '--mesh-out', 'mutual': '--mesh-mutual-out'}[mesh_type] rows = dict(db.execute('SELECT key, value FROM results WHERE coil_id=?', (coil_id,)).fetchall()) rows.update(kwargs) args = ['python', '-m', 'twisted_coil_gen_twolayer', mesh_type, mesh_file, '--pcb'] for k, v in rows.items(): prefix, _, k = k.partition('.') if v != 'False' and prefix == 'gen': args.append('--' + k.replace('_', '-')) if v != 'True': args.append(str(v)) args.append(outfile) return args def get_mesh_file(db, mesh_dir, run_id, coil_id, mesh_type): db.execute('CREATE TABLE IF NOT EXISTS meshes(coil_id INTEGER, mesh_type TEXT, error INTEGER, filename TEXT, timestamp TEXT DEFAULT current_timestamp, FOREIGN KEY (coil_id) REFERENCES coils(coil_id))') row = db.execute('SELECT * FROM meshes WHERE coil_id=? AND mesh_type=? ORDER BY timestamp DESC LIMIT 1', (coil_id, mesh_type)).fetchone() if row is not None: mesh_file = mesh_dir / row['filename'] if mesh_file.is_file(): return mesh_file timestamp = time.strftime('%Y-%m-%d_%H-%M-%S') return mesh_dir / f'mesh-{run_id}-{coil_id}-{mesh_type}-{timestamp}.msh' def ensure_mesh(db, mesh_dir, log_dir, run_id, coil_id, mesh_type): mesh_file = get_mesh_file(db, mesh_dir, run_id, coil_id, mesh_type) if mesh_file.is_file(): return mesh_file db.execute('INSERT INTO meshes(coil_id, mesh_type, error, filename) VALUES (?, ?, 0, ?)', (coil_id, mesh_type, mesh_file.name)) db.commit() mesh_file.parent.mkdir(exist_ok=True) with tempfile.NamedTemporaryFile(suffix='.kicad_pcb') as f: args = mesh_args(db, coil_id, mesh_type, mesh_file, f.name) tqdm.tqdm.write(' '.join(map(str, args))) logfile = log_dir / mesh_file.with_suffix('.log').name logfile.parent.mkdir(exist_ok=True) try: res = subprocess.run(args, check=True, capture_output=True, text=True) logfile.write_text(res.stdout + res.stderr) except subprocess.CalledProcessError as e: print('Mesh generation failed with exit code {e.returncode}', file=sys.stderr) logfile.write_text(e.stdout + e.stderr) print(e.stdout + e.stderr) raise return mesh_file @click.group() @click.option('-d', '--database', default='coil_parameters.sqlite3') @click.pass_context def cli(ctx, database): ctx.ensure_object(dict) def connect(): db = sqlite3.connect(database) db.row_factory = sqlite3.Row return db ctx.obj['db_connect'] = connect @cli.command() @click.pass_context def list_runs(ctx): for row in ctx.obj['db_connect']().execute('SELECT * FROM runs ORDER BY timestamp').fetchall(): print(row['run_id'], row['timestamp'], row['version']) @cli.command() @click.pass_context def list_runs(ctx): for row in ctx.obj['db_connect']().execute('SELECT * FROM runs ORDER BY timestamp').fetchall(): print(row['run_id'], row['timestamp'], row['version']) @cli.command() @click.option('-r', '--run-id') @click.option('-l', '--log-dir', default='logs', type=click.Path(dir_okay=True, file_okay=False, path_type=pathlib.Path)) @click.option('-m', '--mesh-dir', default='meshes') @click.pass_context def list_coils(ctx, run_id, log_dir, mesh_dir): db = ctx.obj['db_connect']() if run_id is None: run_id, = db.execute('SELECT run_id FROM runs ORDER BY timestamp DESC LIMIT 1').fetchone() timestamp, = db.execute('SELECT timestamp FROM runs WHERE run_id=?', (run_id,)).fetchone() mesh_dir = pathlib.Path(mesh_dir) print(f'Listing meshes for run {run_id} at {timestamp}') print() keys = {'gen.turns': 'N', 'gen.twists': 'T', 'gen.single_layer': '1L', 'gen.inner_diameter': 'ID[mm]', 'gen.outer_diameter': 'OD[mm]', 'calculated_fill_factor': 'Fill factor', 'calculated_approximate_inductance': 'L [µH] (design)', 'calculated_trace_length': 'track len [mm]', 'calculated_approximate_resistance': 'R [mΩ] (design)', 'sim_inductance': 'L [µH] (sim)', 'sim_resistance': 'R [mΩ] (sim)'} out = [] for row in db.execute('SELECT *, MAX(meshes.timestamp) FROM coils LEFT JOIN meshes ON coils.coil_id=meshes.coil_id WHERE run_id=? GROUP BY coils.coil_id, mesh_type ORDER BY meshes.timestamp', (run_id,)).fetchall(): if row['timestamp']: if row['error']: state = 'ERROR' elif not (mesh_dir / row['filename']).is_file(): state = 'NOT FOUND' else: state = 'SUCCESS' else: state = 'NOT RUN' params = dict(db.execute('SELECT key, value FROM results WHERE coil_id=?', (row['coil_id'],)).fetchall()) if 'calculated_approximate_inductance' in params: params['calculated_approximate_inductance'] = f'{float(params["calculated_approximate_inductance"])*1e6:.02f}' if 'calculated_trace_length' in params: params['calculated_trace_length'] = f'{float(params["calculated_trace_length"])*1e3:.03f}' if 'calculated_approximate_resistance' in params: params['calculated_approximate_resistance'] = f'{float(params["calculated_approximate_resistance"])*1e3:.03f}' if 'calculated_fill_factor' in params: params['calculated_fill_factor'] = f'{float(params["calculated_fill_factor"]):.03f}' log_file = log_dir / (pathlib.Path(row['filename']).stem + '_elmer_self_inductance.log') if log_file.is_file(): log = log_file.read_text() resistance = re.search(r'Coil resistance calculated by solver: ([0-9.e+-]*) (\w?)Ω', log) inductance = re.search(r'Inductance calucated from field: ([0-9.e+-]*) (\w?)H', log) si_prefix = {'': 1, 'm': 1e-3, 'µ': 1e-6, 'n': 1e-9, 'p': 1e-12} if resistance: resistance = float(resistance.group(1)) * si_prefix[resistance.group(2)] params['sim_resistance'] = format(resistance*1e3, '.3f') if inductance: inductance = float(inductance.group(1)) * si_prefix[inductance.group(2)] params['sim_inductance'] = format(inductance*1e6, '.3f') out.append([row['coil_id'], row['mesh_type'], state, row['timestamp']] + [params.get(key, '-') for key in keys]) print(tabulate(out, headers=['coil', 'mesh', 'state', 'time'] + list(keys.values()), disable_numparse=True, stralign='right')) @cli.command() @click.argument('coil_id', type=int) @click.argument('mesh_type', type=click.Choice(['normal', 'split', 'mutual'])) @click.option('--mesh-file', default='/tmp/test.msh') @click.option('--pcb-file', default='/tmp/test.kicad_pcb') @click.pass_context def cmdline(ctx, coil_id, mesh_type, mesh_file, pcb_file): print(' '.join(mesh_args(ctx.obj['db_connect'](), coil_id, mesh_type, mesh_file, pcb_file))) @cli.group() @click.option('-r', '--run-id') @click.option('-l', '--log-dir', default='logs') @click.option('-m', '--mesh-dir', default='meshes') @click.pass_context def run(ctx, run_id, log_dir, mesh_dir): if run_id is None: run_id, = ctx.obj['db_connect']().execute('SELECT run_id FROM runs ORDER BY timestamp DESC LIMIT 1').fetchone() ctx.obj['run_id'] = run_id log_dir = ctx.obj['log_dir'] = pathlib.Path(log_dir) log_dir.mkdir(exist_ok=True) ctx.obj['mesh_dir'] = pathlib.Path(mesh_dir) @run.command() @click.option('-j', '--num-jobs', type=int, default=1, help='Number of jobs to run in parallel') @click.option('-t', '--mesh-type', type=click.Choice(['split', 'normal', 'mutual']), default=['split', 'normal', 'mutual'], multiple=True) @click.pass_context def generate_meshes(ctx, num_jobs, mesh_type): db = ctx.obj['db_connect']() rows = [row['coil_id'] for row in db.execute('SELECT coil_id FROM coils WHERE run_id=?', (ctx.obj['run_id'],)).fetchall()] params = list(itertools.product(rows, mesh_type)) all_files = {get_mesh_file(db, ctx.obj['mesh_dir'], ctx.obj['run_id'], coil_id, mesh_type): (coil_id, mesh_type) for coil_id, mesh_type in params} todo = [(coil_id, mesh_type) for f, (coil_id, mesh_type) in all_files.items() if not f.is_file()] q = queue.Queue() for elem in todo: q.put(elem) tq = tqdm.tqdm(total=len(todo)) def queue_worker(): try: while True: coil_id, mesh_type = q.get_nowait() try: ensure_mesh(ctx.obj['db_connect'](), ctx.obj['mesh_dir'], ctx.obj['log_dir'], ctx.obj['run_id'], coil_id, mesh_type) except subprocess.CalledProcessError: tqdm.tqdm.write(f'Error generating {mesh_type} mesh for {coil_id=}') tq.update(1) q.task_done() except queue.Empty: pass tqdm.tqdm.write(f'Found {len(params)-len(todo)} meshes out of a total of {len(params)}.') tqdm.tqdm.write(f'Processing the remaining {len(todo)} meshes on {num_jobs} workers in parallel.') threads = [] for i in range(num_jobs): t = threading.Thread(target=queue_worker, daemon=True) t.start() threads.append(t) q.join() @run.command() @click.option('-j', '--num-jobs', type=int, default=1, help='Number of jobs to run in parallel') @click.pass_context def self_inductance(ctx, num_jobs): db = ctx.obj['db_connect']() q = queue.Queue() def queue_worker(): try: while True: mesh_file, logfile = q.get_nowait() with tempfile.TemporaryDirectory() as tmpdir: try: tqdm.tqdm.write(f'Processing {mesh_file}') res = subprocess.run(['python', '-m', 'coil_parasitics', 'inductance', '--sim-dir', tmpdir, mesh_file], check=True, capture_output=True, text=True) logfile.write_text(res.stdout+res.stderr) except subprocess.CalledProcessError as e: print(f'Error running simulation, rc={e.returncode}') logfile.write_text(e.stdout+e.stderr) tq.update(1) q.task_done() except queue.Empty: pass num_meshes, num_params, num_completed = 0, 0, 0 for coil_id, in db.execute('SELECT coil_id FROM coils WHERE run_id=?', (ctx.obj['run_id'],)).fetchall(): num_params += 1 mesh_file = get_mesh_file(ctx.obj['db_connect'](), ctx.obj['mesh_dir'], ctx.obj['run_id'], coil_id, 'normal') if mesh_file.is_file(): num_meshes += 1 logfile = ctx.obj['log_dir'] / (mesh_file.stem + '_elmer_self_inductance.log') if logfile.is_file(): num_completed += 1 else: q.put((mesh_file, logfile)) tqdm.tqdm.write(f'Found {num_meshes} meshes out of a total of {num_params} with {num_completed} completed simulations.') tqdm.tqdm.write(f'Processing the remaining {num_meshes-num_completed} simulations on {num_jobs} workers in parallel.') tq = tqdm.tqdm(total=num_meshes-num_completed) threads = [] for i in range(num_jobs): t = threading.Thread(target=queue_worker, daemon=True) t.start() threads.append(t) q.join() @run.command() @click.argument('target_hosts', type=click.Path(exists=True, dir_okay=False, path_type=pathlib.Path)) @click.argument('job_file', type=click.Path(exists=True, dir_okay=False, path_type=pathlib.Path)) @click.pass_context def run_mutual_inductance(ctx, target_hosts, job_file): db = ctx.obj['db_connect']() target_hosts = [l.strip() for l in target_hosts.read_text().splitlines()] log_dir.mkdir(exist_ok=True) jobs = json.loads(job_file.read_text()) q = queue.Queue() def queue_worker(host): try: while True: case_name, case_id, kwargs = q.get_nowait() workdir = pathlib.Path('/mesh') / f'{case_name}-{case_id}' # container path mesh_file = wordir / 'mesh.msh' # container path host_log = lambda log_name: ctx.obj['log_dir'] / f'{case_name}-{log_name}-{case_id}.log' mesh_log = host_log('mesh') # host path sim_log = host_log('sim') # host path mesh_abs = '/var/tmp/mesh' try: args = mesh_args(db, coil_id, 'mutual', mesh_file, str(workdir / 'tmp_pcb.kicad_pcb'), **kwargs) res = subprocess.run(['ssh', host, 'podman', 'run', '-v', f'{mesh_abs}:/mesh:rw', 'nice-coils-runner', *args], check=True, capture_output=True, text=True) mesh_log.write_text(res.stdout + res.stderr) try: tqdm.tqdm.write(f'Processing {mesh_file} on {host}') res = subprocess.run(['ssh', host, 'podman', 'run', '-v', f'{mesh_abs}:/mesh:rw', 'nice-coils-runner', 'python', '-m', 'coil_parasitics', 'mutual-inductance', '--sim-dir', workdir, mesh_file], check=True, capture_output=True, text=True) sim_log.write_text(res.stdout + res.stderr) try: for fn in ['ElmerGrid_stdout.log', 'ElmerGrid_stderr.log', 'ElmerSolver_stdout.log', 'ElmerSolver_stderr.log']: subprocess.run(['scp', f'{host}:{mesh_abs}/{workdir.name}/{fn}', str(host_log(Path(fn).stem))], check=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: print(f'Error copying simulation logs, rc={e.returncode}', file=sys.stderr) sim_log.write_text(e.stdout + e.stderr) except subprocess.CalledProcessError as e: print(f'Error running simulation, rc={e.returncode}', file=sys.stderr) sim_log.write_text(e.stdout + e.stderr) except subprocess.CalledProcessError as e: print('Mesh generation failed with exit code {e.returncode}', file=sys.stderr) mesh_log.write_text(e.stdout + e.stderr) tq.update(1) q.task_done() except queue.Empty: pass run_id = f'{datetime.datetime.now():%Y%m%d-%H%M%S}-{secrets.token_urlsafe(6)}' for i, (job_name, kwargs) in jobs.items(): case_id = f'{run_id}-{i}' q.put((job_name, case_id, kwargs)) tq = tqdm.tqdm(total=num_meshes-num_completed) threads = [] for host in target_hosts: t = threading.Thread(target=lambda: queue_worker(host), daemon=True) t.start() threads.append(t) q.join() @run.command() @click.pass_context def self_capacitance(ctx): db = ctx.obj['db_connect']() for coil_id, in tqdm.tqdm(db.execute('SELECT coil_id FROM coils WHERE run_id=?', (ctx.obj['run_id'],)).fetchall()): mesh_file = get_mesh_file(ctx.obj['db_connect'](), ctx.obj['mesh_dir'], ctx.obj['run_id'], coil_id, 'normal') if mesh_file.is_file(): logfile = ctx.obj['log_dir'] / (mesh_file.stem + '_elmer_self_capacitance.log') with tempfile.TemporaryDirectory() as tmpdir: try: res = subprocess.run(['python', '-m', 'coil_parasitics', 'self-capacitance', '--sim-dir', tmpdir, mesh_file], check=True, capture_output=True) logfile.write_text(res.stdout+res.stderr) except subprocess.CalledProcessError as e: print(f'Error running simulation, rc={e.returncode}') logfile.write_text(e.stdout+e.stderr) if __name__ == '__main__': cli()