"""Rename embedder figs to the current hash scheme (embed_args + generator_kwargs). Two waves of runs may exist on disk: (1) pre-hash — `.html` (2) intermediate — `_.html` (from the first hash rollout) (3) current — `_.html` when gen_kwargs is truthy; identical to (2) when gen_kwargs is empty. This script queries Prefect for each recent run's full params (so it knows generator_kwargs — which the metrics.json sidecar didn't persist before), finds the matching fig on disk, renames to the current stem, and injects `meta.generator_kwargs` into the metrics.json so the web server's label enrichment disambiguates swiss_roll vs swiss_roll_hole etc. Dry-run by default. Pass --apply to rename. Usage: .venv/bin/python scripts/backfill_hashes.py [--apply] [--figs-dir PATH] """ from __future__ import annotations import argparse import asyncio import hashlib import json import sys from pathlib import Path from typing import Any, Dict, List, Optional _ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(_ROOT)) from app.web.main import PREFECT, run_args_hash, sci_notation # noqa: E402 def _legacy_hash(ea: Optional[Dict[str, Any]]) -> str: s = json.dumps(ea or {}, sort_keys=True, default=str) return hashlib.sha1(s.encode()).hexdigest()[:8] def _base_stems(params: Dict[str, Any]) -> List[str]: """Return the stem prefix(es) for this run's params: both the current sci-J form and the legacy decimal-J form, so we can find pre-transition files on disk too.""" try: gen = (params.get("generator_path") or "").rsplit(".", 1)[-1] emb = (params.get("embedder") or "").rsplit(".", 1)[-1] N = int(params["num_points"]) T = int(params.get("num_timesteps", params.get("num_snapshots"))) Jf = float(params["jitter_scale"]) s = int(params["seed"]) except (KeyError, TypeError, ValueError): return [] if not gen or not emb: return [] out = [f"{gen}_{emb}_N{N}_T{T}_J{sci_notation(Jf)}_s{s}"] legacy = f"{gen}_{emb}_N{N}_T{T}_J{Jf}_s{s}" if legacy not in out: out.append(legacy) return out def _candidate_names(bases: List[str], ea: Dict[str, Any], gk: Dict[str, Any]) -> List[str]: # Target = current sci-J base + new-scheme hash. if not bases: return [] target_base = bases[0] target = f"{target_base}_{run_args_hash(ea, gk)}.html" out = [target] # Fall back to every (base, hash) combination we might find on disk. hashes = [run_args_hash(ea, gk), _legacy_hash(ea)] for b in bases: for h in hashes: x = f"{b}_{h}.html" if x not in out: out.append(x) no_hash = f"{b}.html" if no_hash not in out: out.append(no_hash) return out def _patch_metrics(path: Path, gk: Dict[str, Any]) -> bool: if not path.is_file(): return False try: d = json.loads(path.read_text(encoding="utf-8")) except Exception: return False meta = d.setdefault("meta", {}) if meta.get("generator_kwargs") == gk: return False meta["generator_kwargs"] = gk path.write_text(json.dumps(d, indent=2), encoding="utf-8") return True def _rename_bundle(figs_dir: Path, old_stem: str, new_stem: str) -> List[str]: moved = [] for suffix in (".html", ".metrics.json", ".frames.json"): src = figs_dir / f"{old_stem}{suffix}" if not src.exists(): continue dst = figs_dir / f"{new_stem}{suffix}" if dst.exists(): moved.append(f"SKIP (target exists) {src.name}") continue src.rename(dst) moved.append(f"{src.name} -> {dst.name}") return moved async def _fetch_runs(limit: int = 200) -> List[Dict[str, Any]]: import httpx async with httpx.AsyncClient(timeout=10.0) as c: return await PREFECT.recent_runs(c, limit=limit) def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--apply", action="store_true", help="actually rename + patch (default: dry-run)") ap.add_argument("--figs-dir", default=str(_ROOT / "figs"), help="path to figs/ directory") ap.add_argument("--limit", type=int, default=200, help="Prefect runs to scan") args = ap.parse_args() figs_dir = Path(args.figs_dir).resolve() if not figs_dir.is_dir(): print(f"no such directory: {figs_dir}", file=sys.stderr) return 2 try: runs = asyncio.run(_fetch_runs(limit=args.limit)) except Exception as e: print(f"could not reach Prefect at {PREFECT.base} ({e})", file=sys.stderr) return 3 plans = [] # (old_stem, new_stem, gk, found_name) seen_targets = set() for r in runs: params = r.get("parameters") or {} ea = params.get("embed_args") or {} gk = params.get("generator_kwargs") or {} bases = _base_stems(params) if not bases: continue target = f"{bases[0]}_{run_args_hash(ea, gk)}.html" if target in seen_targets: continue # later duplicate — the stale-marking logic will handle it for candidate in _candidate_names(bases, ea, gk): if (figs_dir / candidate).exists(): if candidate == target: # Already at target; just ensure metrics.json carries gk. plans.append((Path(candidate).stem, Path(target).stem, gk, candidate, True)) else: plans.append((Path(candidate).stem, Path(target).stem, gk, candidate, False)) seen_targets.add(target) break print(f"scanning {figs_dir} (Prefect runs seen: {len(runs)})") renames = [p for p in plans if not p[4]] already = [p for p in plans if p[4]] print(f" {len(renames)} to rename, {len(already)} already at target\n") for old, new, gk, _, _ in renames: gk_str = json.dumps(gk) if gk else "{}" print(f" rename {old} -> {new} gen_kwargs={gk_str}") if already: print(f"\n at-target (will only patch metrics.json if missing gen_kwargs):") for old, _, gk, name, _ in already: print(f" {name} gen_kwargs={json.dumps(gk) if gk else '{}'}") if not renames and not already: print("nothing to do") return 0 if not args.apply: print("\n(dry run — pass --apply to rename + patch)") return 0 print("\napplying...") for old, new, gk, _, at_target in plans: if not at_target: for line in _rename_bundle(figs_dir, old, new): print(f" {line}") patched = _patch_metrics(figs_dir / f"{new}.metrics.json", gk) if patched: print(f" patched {new}.metrics.json (generator_kwargs)") print(f"done — renamed {len(renames)}, patched metrics for {len(plans)} run(s)") return 0 if __name__ == "__main__": sys.exit(main())