"""Find and delete figs/ artifacts whose Prefect flow run no longer exists. The web app's /metrics page scans figs/ directly on disk, independent of Prefect. Deleting a run from Prefect's UI leaves its artifacts (.html, .metrics.json, .frames.json) orphaned, so /metrics keeps showing a result for a run that /'s "recent runs" can no longer find. This script paginates Prefect's flow-run list to build the authoritative set of live run names, then diffs that against artifact stems on disk. Reference animations (`*_Reference_*.html`) are companion ground-truth files that are generated alongside embedder runs but do not have their own Prefect flow — they're always preserved. Dry-run by default. Pass --apply to delete. Usage: .venv/bin/python scripts/clean_stale_artifacts.py [--apply] [--figs-dir PATH] """ from __future__ import annotations import argparse import asyncio import hashlib import json import sys from collections import defaultdict from pathlib import Path from typing import Any, Dict, List, Set, Tuple _ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(_ROOT)) from app.web.main import PREFECT, run_args_hash, sci_notation # noqa: E402 BUNDLE_SUFFIXES = (".html", ".metrics.json", ".frames.json") def _stem_of(path: Path) -> str: name = path.name for suf in BUNDLE_SUFFIXES: if name.endswith(suf): return name[: -len(suf)] return path.stem def _is_reference(stem: str) -> bool: return "_Reference_" in stem def _candidate_stems(params: Dict[str, Any]) -> List[str]: """Every artifact stem a given run's parameters could map to on disk. A run's Prefect `name` field is unreliable for matching: pre-custom- naming flows wear auto-generated adjective-animal names, and the `J` segment was historically written as a raw float before `sci_notation` was introduced. This regenerates the stem from the authoritative `parameters` dict, across all schemes `backfill_hashes.py` understands: - base: sci-J (current) and decimal-J (legacy) - hash: sha1(embed_args + generator_kwargs) (current), sha1(embed_args) (intermediate), no-hash (pre-hash era) """ try: gen = (params.get("generator_path") or "").rsplit(".", 1)[-1] emb = (params.get("embedder") or "").rsplit(".", 1)[-1] N = int(params["num_points"]) T = int(params.get("num_timesteps", params.get("num_snapshots"))) Jf = float(params["jitter_scale"]) s = int(params["seed"]) except (KeyError, TypeError, ValueError): return [] if not gen or not emb: return [] ea = params.get("embed_args") or {} gk = params.get("generator_kwargs") or {} bases = [f"{gen}_{emb}_N{N}_T{T}_J{sci_notation(Jf)}_s{s}"] legacy_base = f"{gen}_{emb}_N{N}_T{T}_J{Jf}_s{s}" if legacy_base not in bases: bases.append(legacy_base) current_hash = run_args_hash(ea, gk) legacy_hash = hashlib.sha1( json.dumps(ea, sort_keys=True, default=str).encode() ).hexdigest()[:8] out: List[str] = [] for b in bases: out.append(f"{b}_{current_hash}") if legacy_hash != current_hash: out.append(f"{b}_{legacy_hash}") out.append(b) # pre-hash return out async def _fetch_all_live_stems() -> Tuple[Set[str], int]: """Return (set of all candidate stems claimed by a live Prefect run, count of live runs seen).""" import httpx stems: Set[str] = set() n_runs = 0 PAGE = 200 # Prefect caps /flow_runs/filter at limit=200 offset = 0 async with httpx.AsyncClient(timeout=15.0) as c: dep = await PREFECT.deployment_id(c) if not dep: raise RuntimeError( f"could not resolve deployment id from {PREFECT.base}" ) while True: r = await c.post( f"{PREFECT.base}/flow_runs/filter", json={ "sort": "START_TIME_DESC", "limit": PAGE, "offset": offset, "flow_runs": {"deployment_id": {"any_": [dep]}}, }, ) r.raise_for_status() batch = r.json() or [] n_runs += len(batch) for run in batch: for s in _candidate_stems(run.get("parameters") or {}): stems.add(s) if len(batch) < PAGE: break offset += PAGE return stems, n_runs def _group_by_stem(figs_dir: Path) -> Dict[str, List[Path]]: groups: Dict[str, List[Path]] = defaultdict(list) for p in figs_dir.iterdir(): if not p.is_file(): continue if not any(p.name.endswith(s) for s in BUNDLE_SUFFIXES): continue groups[_stem_of(p)].append(p) return groups def _fmt_size(n: float) -> str: for u in ("B", "KB", "MB", "GB"): if n < 1024 or u == "GB": return f"{int(n)}{u}" if u == "B" else f"{n:.1f}{u}" n /= 1024 return f"{n:.1f}GB" def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument( "--apply", action="store_true", help="actually delete files (default: dry-run)", ) ap.add_argument( "--figs-dir", default=str(_ROOT / "figs"), help="path to figs/ directory", ) ap.add_argument( "--allow-empty", action="store_true", help="proceed even if Prefect reports zero runs (otherwise we bail " "to avoid nuking everything when Prefect is unreachable)", ) args = ap.parse_args() figs_dir = Path(args.figs_dir).resolve() if not figs_dir.is_dir(): print(f"no such directory: {figs_dir}", file=sys.stderr) return 2 try: live_stems, n_runs = asyncio.run(_fetch_all_live_stems()) except Exception as e: print(f"could not list Prefect runs ({e})", file=sys.stderr) return 3 groups = _group_by_stem(figs_dir) stale: List[Tuple[str, List[Path], int]] = [] reference_kept = 0 matched_kept = 0 for stem, files in sorted(groups.items()): if _is_reference(stem): reference_kept += 1 continue if stem in live_stems: matched_kept += 1 continue total = sum(f.stat().st_size for f in files) stale.append((stem, sorted(files, key=lambda p: p.name), total)) print(f"scanning {figs_dir}") print(f" {n_runs} flow run(s) live in Prefect") print(f" {matched_kept} bundle(s) matched to live runs") print(f" {reference_kept} reference bundle(s) preserved") print(f" {len(stale)} stale bundle(s) to remove\n") if not n_runs and stale and not args.allow_empty: print( "refusing to proceed: Prefect reported 0 runs. If that's actually " "correct (e.g. you cleared the whole DB), re-run with --allow-empty.", file=sys.stderr, ) return 4 if not stale: return 0 total_bytes = 0 file_count = 0 for stem, files, size in stale: total_bytes += size file_count += len(files) print(f" {_fmt_size(size):>9} {stem}") for f in files: print(f" + {f.name}") print(f"\n total: {_fmt_size(total_bytes)} across {file_count} file(s)") if not args.apply: print("\n(dry run — pass --apply to delete)") return 0 print("\ndeleting...") removed = 0 for _, files, _ in stale: for f in files: try: f.unlink() removed += 1 except OSError as e: print(f" FAILED {f.name}: {e}") print(f"done — removed {removed} file(s), freed ~{_fmt_size(total_bytes)}") return 0 if __name__ == "__main__": sys.exit(main())