dr-sandbox/scripts/backfill_hashes.py
Michael Pilosov e94d28b8fc filenames + run names: J in sci notation (5E-3 not 0.005)
Periods in filenames are avoidable and the Prefect UI dislikes them in
run names. Uses a shared sci_notation helper in main.py mirrored in the
flow. Stem regex (main + parser) now matches J<digits.Ee+-> to accept
both old decimal-J and new sci-J filenames so the two transition
together. J tag in Prefect tag list also uses the sci form, so chip
filters stay consistent.

Backfill script extended to find pre-transition (decimal-J) files on
disk via a second base-stem variant, then rename them to the sci form.
backfill_tags re-patches existing runs so their J tag matches the new
canonical form.

All 13 existing figs + runs renamed / retagged in-place.
2026-04-22 17:54:46 -06:00

195 lines
6.9 KiB
Python

"""Rename embedder figs to the current hash scheme (embed_args + generator_kwargs).
Two waves of runs may exist on disk:
(1) pre-hash — `<stem>.html`
(2) intermediate — `<stem>_<sha1(embed_args)>.html` (from the first hash rollout)
(3) current — `<stem>_<sha1(embed_args, gen_kwargs)>.html` when gen_kwargs is truthy;
identical to (2) when gen_kwargs is empty.
This script queries Prefect for each recent run's full params (so it knows
generator_kwargs — which the metrics.json sidecar didn't persist before), finds
the matching fig on disk, renames to the current stem, and injects
`meta.generator_kwargs` into the metrics.json so the web server's label
enrichment disambiguates swiss_roll vs swiss_roll_hole etc.
Dry-run by default. Pass --apply to rename.
Usage:
.venv/bin/python scripts/backfill_hashes.py [--apply] [--figs-dir PATH]
"""
from __future__ import annotations
import argparse
import asyncio
import hashlib
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_ROOT))
from app.web.main import PREFECT, run_args_hash, sci_notation # noqa: E402
def _legacy_hash(ea: Optional[Dict[str, Any]]) -> str:
s = json.dumps(ea or {}, sort_keys=True, default=str)
return hashlib.sha1(s.encode()).hexdigest()[:8]
def _base_stems(params: Dict[str, Any]) -> List[str]:
"""Return the stem prefix(es) for this run's params: both the current
sci-J form and the legacy decimal-J form, so we can find pre-transition
files on disk too."""
try:
gen = (params.get("generator_path") or "").rsplit(".", 1)[-1]
emb = (params.get("embedder") or "").rsplit(".", 1)[-1]
N = int(params["num_points"])
T = int(params.get("num_timesteps", params.get("num_snapshots")))
Jf = float(params["jitter_scale"])
s = int(params["seed"])
except (KeyError, TypeError, ValueError):
return []
if not gen or not emb:
return []
out = [f"{gen}_{emb}_N{N}_T{T}_J{sci_notation(Jf)}_s{s}"]
legacy = f"{gen}_{emb}_N{N}_T{T}_J{Jf}_s{s}"
if legacy not in out:
out.append(legacy)
return out
def _candidate_names(bases: List[str], ea: Dict[str, Any], gk: Dict[str, Any]) -> List[str]:
# Target = current sci-J base + new-scheme hash.
if not bases:
return []
target_base = bases[0]
target = f"{target_base}_{run_args_hash(ea, gk)}.html"
out = [target]
# Fall back to every (base, hash) combination we might find on disk.
hashes = [run_args_hash(ea, gk), _legacy_hash(ea)]
for b in bases:
for h in hashes:
x = f"{b}_{h}.html"
if x not in out:
out.append(x)
no_hash = f"{b}.html"
if no_hash not in out:
out.append(no_hash)
return out
def _patch_metrics(path: Path, gk: Dict[str, Any]) -> bool:
if not path.is_file():
return False
try:
d = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return False
meta = d.setdefault("meta", {})
if meta.get("generator_kwargs") == gk:
return False
meta["generator_kwargs"] = gk
path.write_text(json.dumps(d, indent=2), encoding="utf-8")
return True
def _rename_bundle(figs_dir: Path, old_stem: str, new_stem: str) -> List[str]:
moved = []
for suffix in (".html", ".metrics.json", ".frames.json"):
src = figs_dir / f"{old_stem}{suffix}"
if not src.exists():
continue
dst = figs_dir / f"{new_stem}{suffix}"
if dst.exists():
moved.append(f"SKIP (target exists) {src.name}")
continue
src.rename(dst)
moved.append(f"{src.name} -> {dst.name}")
return moved
async def _fetch_runs(limit: int = 200) -> List[Dict[str, Any]]:
import httpx
async with httpx.AsyncClient(timeout=10.0) as c:
return await PREFECT.recent_runs(c, limit=limit)
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--apply", action="store_true", help="actually rename + patch (default: dry-run)")
ap.add_argument("--figs-dir", default=str(_ROOT / "figs"), help="path to figs/ directory")
ap.add_argument("--limit", type=int, default=200, help="Prefect runs to scan")
args = ap.parse_args()
figs_dir = Path(args.figs_dir).resolve()
if not figs_dir.is_dir():
print(f"no such directory: {figs_dir}", file=sys.stderr)
return 2
try:
runs = asyncio.run(_fetch_runs(limit=args.limit))
except Exception as e:
print(f"could not reach Prefect at {PREFECT.base} ({e})", file=sys.stderr)
return 3
plans = [] # (old_stem, new_stem, gk, found_name)
seen_targets = set()
for r in runs:
params = r.get("parameters") or {}
ea = params.get("embed_args") or {}
gk = params.get("generator_kwargs") or {}
bases = _base_stems(params)
if not bases:
continue
target = f"{bases[0]}_{run_args_hash(ea, gk)}.html"
if target in seen_targets:
continue # later duplicate — the stale-marking logic will handle it
for candidate in _candidate_names(bases, ea, gk):
if (figs_dir / candidate).exists():
if candidate == target:
# Already at target; just ensure metrics.json carries gk.
plans.append((Path(candidate).stem, Path(target).stem, gk, candidate, True))
else:
plans.append((Path(candidate).stem, Path(target).stem, gk, candidate, False))
seen_targets.add(target)
break
print(f"scanning {figs_dir} (Prefect runs seen: {len(runs)})")
renames = [p for p in plans if not p[4]]
already = [p for p in plans if p[4]]
print(f" {len(renames)} to rename, {len(already)} already at target\n")
for old, new, gk, _, _ in renames:
gk_str = json.dumps(gk) if gk else "{}"
print(f" rename {old} -> {new} gen_kwargs={gk_str}")
if already:
print(f"\n at-target (will only patch metrics.json if missing gen_kwargs):")
for old, _, gk, name, _ in already:
print(f" {name} gen_kwargs={json.dumps(gk) if gk else '{}'}")
if not renames and not already:
print("nothing to do")
return 0
if not args.apply:
print("\n(dry run — pass --apply to rename + patch)")
return 0
print("\napplying...")
for old, new, gk, _, at_target in plans:
if not at_target:
for line in _rename_bundle(figs_dir, old, new):
print(f" {line}")
patched = _patch_metrics(figs_dir / f"{new}.metrics.json", gk)
if patched:
print(f" patched {new}.metrics.json (generator_kwargs)")
print(f"done — renamed {len(renames)}, patched metrics for {len(plans)} run(s)")
return 0
if __name__ == "__main__":
sys.exit(main())