dr-sandbox/scripts/backfill_hashes.py
Michael Pilosov b744c48348 stems: fold generator_kwargs into the hash; fix swiss_roll vs hole ambiguity
- run_args_hash now covers (embed_args, generator_kwargs). When gen_kwargs
  is empty we still hash embed_args alone — so plain generators (s_curve,
  plain swiss_roll) keep their stems and no existing plain-gen figs need
  renaming. Kwargs-bearing variants (swiss_roll_hole, blobs,
  gaussian_quantiles, classification) now disambiguate properly.
- Flow persists generator_kwargs into metrics.json meta AND into the
  frames.json sidecar meta, so the label-enrichment path can find it
  without another lookup.
- _enrich_with_labels discovers gen_kwargs in priority: payload meta -->
  sibling metrics.json --> DATASET_META first-match. It matches the
  DATASET_META entry by (path, kwargs) so swiss_roll_hole is no longer
  confused for plain swiss_roll.
- _cached_frames overrides meta.stem with the URL-requested stem before
  enrichment — after a backfill rename the sidecar's baked-in stem is
  stale, and we were then failing to find the sibling metrics.json.
- Submit duplicate-check uses the new hash and keeps the hashless-legacy
  check as a safety net.
- backfill_hashes.py rewritten: queries Prefect for each recent run's
  full params, finds the matching fig under any of (current, legacy,
  hashless) names, renames to the current scheme and patches
  generator_kwargs into metrics.json.
2026-04-22 16:30:42 -06:00

180 lines
6.4 KiB
Python

"""Rename embedder figs to the current hash scheme (embed_args + generator_kwargs).
Two waves of runs may exist on disk:
(1) pre-hash — `<stem>.html`
(2) intermediate — `<stem>_<sha1(embed_args)>.html` (from the first hash rollout)
(3) current — `<stem>_<sha1(embed_args, gen_kwargs)>.html` when gen_kwargs is truthy;
identical to (2) when gen_kwargs is empty.
This script queries Prefect for each recent run's full params (so it knows
generator_kwargs — which the metrics.json sidecar didn't persist before), finds
the matching fig on disk, renames to the current stem, and injects
`meta.generator_kwargs` into the metrics.json so the web server's label
enrichment disambiguates swiss_roll vs swiss_roll_hole etc.
Dry-run by default. Pass --apply to rename.
Usage:
.venv/bin/python scripts/backfill_hashes.py [--apply] [--figs-dir PATH]
"""
from __future__ import annotations
import argparse
import asyncio
import hashlib
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_ROOT))
from app.web.main import PREFECT, run_args_hash # noqa: E402
def _legacy_hash(ea: Optional[Dict[str, Any]]) -> str:
s = json.dumps(ea or {}, sort_keys=True, default=str)
return hashlib.sha1(s.encode()).hexdigest()[:8]
def _base_stem(params: Dict[str, Any]) -> Optional[str]:
try:
gen = (params.get("generator_path") or "").rsplit(".", 1)[-1]
emb = (params.get("embedder") or "").rsplit(".", 1)[-1]
N = int(params["num_points"])
T = int(params.get("num_timesteps", params.get("num_snapshots")))
J = float(params["jitter_scale"])
s = int(params["seed"])
except (KeyError, TypeError, ValueError):
return None
if not gen or not emb:
return None
return f"{gen}_{emb}_N{N}_T{T}_J{J}_s{s}"
def _candidate_names(base: str, ea: Dict[str, Any], gk: Dict[str, Any]) -> List[str]:
target = f"{base}_{run_args_hash(ea, gk)}.html"
legacy = f"{base}_{_legacy_hash(ea)}.html"
no_hash = f"{base}.html"
# Preserve order: target first so we short-circuit on already-backfilled.
out = [target]
for x in (legacy, no_hash):
if x not in out:
out.append(x)
return out
def _patch_metrics(path: Path, gk: Dict[str, Any]) -> bool:
if not path.is_file():
return False
try:
d = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return False
meta = d.setdefault("meta", {})
if meta.get("generator_kwargs") == gk:
return False
meta["generator_kwargs"] = gk
path.write_text(json.dumps(d, indent=2), encoding="utf-8")
return True
def _rename_bundle(figs_dir: Path, old_stem: str, new_stem: str) -> List[str]:
moved = []
for suffix in (".html", ".metrics.json", ".frames.json"):
src = figs_dir / f"{old_stem}{suffix}"
if not src.exists():
continue
dst = figs_dir / f"{new_stem}{suffix}"
if dst.exists():
moved.append(f"SKIP (target exists) {src.name}")
continue
src.rename(dst)
moved.append(f"{src.name} -> {dst.name}")
return moved
async def _fetch_runs(limit: int = 200) -> List[Dict[str, Any]]:
import httpx
async with httpx.AsyncClient(timeout=10.0) as c:
return await PREFECT.recent_runs(c, limit=limit)
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--apply", action="store_true", help="actually rename + patch (default: dry-run)")
ap.add_argument("--figs-dir", default=str(_ROOT / "figs"), help="path to figs/ directory")
ap.add_argument("--limit", type=int, default=200, help="Prefect runs to scan")
args = ap.parse_args()
figs_dir = Path(args.figs_dir).resolve()
if not figs_dir.is_dir():
print(f"no such directory: {figs_dir}", file=sys.stderr)
return 2
try:
runs = asyncio.run(_fetch_runs(limit=args.limit))
except Exception as e:
print(f"could not reach Prefect at {PREFECT.base} ({e})", file=sys.stderr)
return 3
plans = [] # (old_stem, new_stem, gk, found_name)
seen_targets = set()
for r in runs:
params = r.get("parameters") or {}
ea = params.get("embed_args") or {}
gk = params.get("generator_kwargs") or {}
base = _base_stem(params)
if not base:
continue
target = f"{base}_{run_args_hash(ea, gk)}.html"
if target in seen_targets:
continue # later duplicate — the stale-marking logic will handle it
for candidate in _candidate_names(base, ea, gk):
if (figs_dir / candidate).exists():
if candidate == target:
# Already at target; just ensure metrics.json carries gk.
plans.append((Path(candidate).stem, Path(target).stem, gk, candidate, True))
else:
plans.append((Path(candidate).stem, Path(target).stem, gk, candidate, False))
seen_targets.add(target)
break
print(f"scanning {figs_dir} (Prefect runs seen: {len(runs)})")
renames = [p for p in plans if not p[4]]
already = [p for p in plans if p[4]]
print(f" {len(renames)} to rename, {len(already)} already at target\n")
for old, new, gk, _, _ in renames:
gk_str = json.dumps(gk) if gk else "{}"
print(f" rename {old} -> {new} gen_kwargs={gk_str}")
if already:
print(f"\n at-target (will only patch metrics.json if missing gen_kwargs):")
for old, _, gk, name, _ in already:
print(f" {name} gen_kwargs={json.dumps(gk) if gk else '{}'}")
if not renames and not already:
print("nothing to do")
return 0
if not args.apply:
print("\n(dry run — pass --apply to rename + patch)")
return 0
print("\napplying...")
for old, new, gk, _, at_target in plans:
if not at_target:
for line in _rename_bundle(figs_dir, old, new):
print(f" {line}")
patched = _patch_metrics(figs_dir / f"{new}.metrics.json", gk)
if patched:
print(f" patched {new}.metrics.json (generator_kwargs)")
print(f"done — renamed {len(renames)}, patched metrics for {len(plans)} run(s)")
return 0
if __name__ == "__main__":
sys.exit(main())