diff --git a/scripts/clean_stale_artifacts.py b/scripts/clean_stale_artifacts.py new file mode 100644 index 0000000..57acd52 --- /dev/null +++ b/scripts/clean_stale_artifacts.py @@ -0,0 +1,245 @@ +"""Find and delete figs/ artifacts whose Prefect flow run no longer exists. + +The web app's /metrics page scans figs/ directly on disk, independent of +Prefect. Deleting a run from Prefect's UI leaves its artifacts (.html, +.metrics.json, .frames.json) orphaned, so /metrics keeps showing a result +for a run that /'s "recent runs" can no longer find. + +This script paginates Prefect's flow-run list to build the authoritative +set of live run names, then diffs that against artifact stems on disk. + +Reference animations (`*_Reference_*.html`) are companion ground-truth +files that are generated alongside embedder runs but do not have their +own Prefect flow — they're always preserved. + +Dry-run by default. Pass --apply to delete. + +Usage: + .venv/bin/python scripts/clean_stale_artifacts.py [--apply] [--figs-dir PATH] +""" + +from __future__ import annotations + +import argparse +import asyncio +import hashlib +import json +import sys +from collections import defaultdict +from pathlib import Path +from typing import Any, Dict, List, Set, Tuple + +_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(_ROOT)) +from app.web.main import PREFECT, run_args_hash, sci_notation # noqa: E402 + + +BUNDLE_SUFFIXES = (".html", ".metrics.json", ".frames.json") + + +def _stem_of(path: Path) -> str: + name = path.name + for suf in BUNDLE_SUFFIXES: + if name.endswith(suf): + return name[: -len(suf)] + return path.stem + + +def _is_reference(stem: str) -> bool: + return "_Reference_" in stem + + +def _candidate_stems(params: Dict[str, Any]) -> List[str]: + """Every artifact stem a given run's parameters could map to on disk. + + A run's Prefect `name` field is unreliable for matching: pre-custom- + naming flows wear auto-generated adjective-animal names, and the + `J` segment was historically written as a raw float before + `sci_notation` was introduced. This regenerates the stem from the + authoritative `parameters` dict, across all schemes `backfill_hashes.py` + understands: + - base: sci-J (current) and decimal-J (legacy) + - hash: sha1(embed_args + generator_kwargs) (current), + sha1(embed_args) (intermediate), + no-hash (pre-hash era) + """ + try: + gen = (params.get("generator_path") or "").rsplit(".", 1)[-1] + emb = (params.get("embedder") or "").rsplit(".", 1)[-1] + N = int(params["num_points"]) + T = int(params.get("num_timesteps", params.get("num_snapshots"))) + Jf = float(params["jitter_scale"]) + s = int(params["seed"]) + except (KeyError, TypeError, ValueError): + return [] + if not gen or not emb: + return [] + + ea = params.get("embed_args") or {} + gk = params.get("generator_kwargs") or {} + + bases = [f"{gen}_{emb}_N{N}_T{T}_J{sci_notation(Jf)}_s{s}"] + legacy_base = f"{gen}_{emb}_N{N}_T{T}_J{Jf}_s{s}" + if legacy_base not in bases: + bases.append(legacy_base) + + current_hash = run_args_hash(ea, gk) + legacy_hash = hashlib.sha1( + json.dumps(ea, sort_keys=True, default=str).encode() + ).hexdigest()[:8] + + out: List[str] = [] + for b in bases: + out.append(f"{b}_{current_hash}") + if legacy_hash != current_hash: + out.append(f"{b}_{legacy_hash}") + out.append(b) # pre-hash + return out + + +async def _fetch_all_live_stems() -> Tuple[Set[str], int]: + """Return (set of all candidate stems claimed by a live Prefect run, + count of live runs seen).""" + import httpx + + stems: Set[str] = set() + n_runs = 0 + PAGE = 200 # Prefect caps /flow_runs/filter at limit=200 + offset = 0 + async with httpx.AsyncClient(timeout=15.0) as c: + dep = await PREFECT.deployment_id(c) + if not dep: + raise RuntimeError( + f"could not resolve deployment id from {PREFECT.base}" + ) + while True: + r = await c.post( + f"{PREFECT.base}/flow_runs/filter", + json={ + "sort": "START_TIME_DESC", + "limit": PAGE, + "offset": offset, + "flow_runs": {"deployment_id": {"any_": [dep]}}, + }, + ) + r.raise_for_status() + batch = r.json() or [] + n_runs += len(batch) + for run in batch: + for s in _candidate_stems(run.get("parameters") or {}): + stems.add(s) + if len(batch) < PAGE: + break + offset += PAGE + return stems, n_runs + + +def _group_by_stem(figs_dir: Path) -> Dict[str, List[Path]]: + groups: Dict[str, List[Path]] = defaultdict(list) + for p in figs_dir.iterdir(): + if not p.is_file(): + continue + if not any(p.name.endswith(s) for s in BUNDLE_SUFFIXES): + continue + groups[_stem_of(p)].append(p) + return groups + + +def _fmt_size(n: float) -> str: + for u in ("B", "KB", "MB", "GB"): + if n < 1024 or u == "GB": + return f"{int(n)}{u}" if u == "B" else f"{n:.1f}{u}" + n /= 1024 + return f"{n:.1f}GB" + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument( + "--apply", action="store_true", + help="actually delete files (default: dry-run)", + ) + ap.add_argument( + "--figs-dir", default=str(_ROOT / "figs"), + help="path to figs/ directory", + ) + ap.add_argument( + "--allow-empty", action="store_true", + help="proceed even if Prefect reports zero runs (otherwise we bail " + "to avoid nuking everything when Prefect is unreachable)", + ) + args = ap.parse_args() + + figs_dir = Path(args.figs_dir).resolve() + if not figs_dir.is_dir(): + print(f"no such directory: {figs_dir}", file=sys.stderr) + return 2 + + try: + live_stems, n_runs = asyncio.run(_fetch_all_live_stems()) + except Exception as e: + print(f"could not list Prefect runs ({e})", file=sys.stderr) + return 3 + + groups = _group_by_stem(figs_dir) + + stale: List[Tuple[str, List[Path], int]] = [] + reference_kept = 0 + matched_kept = 0 + for stem, files in sorted(groups.items()): + if _is_reference(stem): + reference_kept += 1 + continue + if stem in live_stems: + matched_kept += 1 + continue + total = sum(f.stat().st_size for f in files) + stale.append((stem, sorted(files, key=lambda p: p.name), total)) + + print(f"scanning {figs_dir}") + print(f" {n_runs} flow run(s) live in Prefect") + print(f" {matched_kept} bundle(s) matched to live runs") + print(f" {reference_kept} reference bundle(s) preserved") + print(f" {len(stale)} stale bundle(s) to remove\n") + + if not n_runs and stale and not args.allow_empty: + print( + "refusing to proceed: Prefect reported 0 runs. If that's actually " + "correct (e.g. you cleared the whole DB), re-run with --allow-empty.", + file=sys.stderr, + ) + return 4 + + if not stale: + return 0 + + total_bytes = 0 + file_count = 0 + for stem, files, size in stale: + total_bytes += size + file_count += len(files) + print(f" {_fmt_size(size):>9} {stem}") + for f in files: + print(f" + {f.name}") + + print(f"\n total: {_fmt_size(total_bytes)} across {file_count} file(s)") + + if not args.apply: + print("\n(dry run — pass --apply to delete)") + return 0 + + print("\ndeleting...") + removed = 0 + for _, files, _ in stale: + for f in files: + try: + f.unlink() + removed += 1 + except OSError as e: + print(f" FAILED {f.name}: {e}") + print(f"done — removed {removed} file(s), freed ~{_fmt_size(total_bytes)}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())