dr-sandbox/scripts/clean_stale_artifacts.py

246 lines
7.7 KiB
Python

"""Find and delete figs/ artifacts whose Prefect flow run no longer exists.
The web app's /metrics page scans figs/ directly on disk, independent of
Prefect. Deleting a run from Prefect's UI leaves its artifacts (.html,
.metrics.json, .frames.json) orphaned, so /metrics keeps showing a result
for a run that /'s "recent runs" can no longer find.
This script paginates Prefect's flow-run list to build the authoritative
set of live run names, then diffs that against artifact stems on disk.
Reference animations (`*_Reference_*.html`) are companion ground-truth
files that are generated alongside embedder runs but do not have their
own Prefect flow — they're always preserved.
Dry-run by default. Pass --apply to delete.
Usage:
.venv/bin/python scripts/clean_stale_artifacts.py [--apply] [--figs-dir PATH]
"""
from __future__ import annotations
import argparse
import asyncio
import hashlib
import json
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Set, Tuple
_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_ROOT))
from app.web.main import PREFECT, run_args_hash, sci_notation # noqa: E402
BUNDLE_SUFFIXES = (".html", ".metrics.json", ".frames.json")
def _stem_of(path: Path) -> str:
name = path.name
for suf in BUNDLE_SUFFIXES:
if name.endswith(suf):
return name[: -len(suf)]
return path.stem
def _is_reference(stem: str) -> bool:
return "_Reference_" in stem
def _candidate_stems(params: Dict[str, Any]) -> List[str]:
"""Every artifact stem a given run's parameters could map to on disk.
A run's Prefect `name` field is unreliable for matching: pre-custom-
naming flows wear auto-generated adjective-animal names, and the
`J` segment was historically written as a raw float before
`sci_notation` was introduced. This regenerates the stem from the
authoritative `parameters` dict, across all schemes `backfill_hashes.py`
understands:
- base: sci-J (current) and decimal-J (legacy)
- hash: sha1(embed_args + generator_kwargs) (current),
sha1(embed_args) (intermediate),
no-hash (pre-hash era)
"""
try:
gen = (params.get("generator_path") or "").rsplit(".", 1)[-1]
emb = (params.get("embedder") or "").rsplit(".", 1)[-1]
N = int(params["num_points"])
T = int(params.get("num_timesteps", params.get("num_snapshots")))
Jf = float(params["jitter_scale"])
s = int(params["seed"])
except (KeyError, TypeError, ValueError):
return []
if not gen or not emb:
return []
ea = params.get("embed_args") or {}
gk = params.get("generator_kwargs") or {}
bases = [f"{gen}_{emb}_N{N}_T{T}_J{sci_notation(Jf)}_s{s}"]
legacy_base = f"{gen}_{emb}_N{N}_T{T}_J{Jf}_s{s}"
if legacy_base not in bases:
bases.append(legacy_base)
current_hash = run_args_hash(ea, gk)
legacy_hash = hashlib.sha1(
json.dumps(ea, sort_keys=True, default=str).encode()
).hexdigest()[:8]
out: List[str] = []
for b in bases:
out.append(f"{b}_{current_hash}")
if legacy_hash != current_hash:
out.append(f"{b}_{legacy_hash}")
out.append(b) # pre-hash
return out
async def _fetch_all_live_stems() -> Tuple[Set[str], int]:
"""Return (set of all candidate stems claimed by a live Prefect run,
count of live runs seen)."""
import httpx
stems: Set[str] = set()
n_runs = 0
PAGE = 200 # Prefect caps /flow_runs/filter at limit=200
offset = 0
async with httpx.AsyncClient(timeout=15.0) as c:
dep = await PREFECT.deployment_id(c)
if not dep:
raise RuntimeError(
f"could not resolve deployment id from {PREFECT.base}"
)
while True:
r = await c.post(
f"{PREFECT.base}/flow_runs/filter",
json={
"sort": "START_TIME_DESC",
"limit": PAGE,
"offset": offset,
"flow_runs": {"deployment_id": {"any_": [dep]}},
},
)
r.raise_for_status()
batch = r.json() or []
n_runs += len(batch)
for run in batch:
for s in _candidate_stems(run.get("parameters") or {}):
stems.add(s)
if len(batch) < PAGE:
break
offset += PAGE
return stems, n_runs
def _group_by_stem(figs_dir: Path) -> Dict[str, List[Path]]:
groups: Dict[str, List[Path]] = defaultdict(list)
for p in figs_dir.iterdir():
if not p.is_file():
continue
if not any(p.name.endswith(s) for s in BUNDLE_SUFFIXES):
continue
groups[_stem_of(p)].append(p)
return groups
def _fmt_size(n: float) -> str:
for u in ("B", "KB", "MB", "GB"):
if n < 1024 or u == "GB":
return f"{int(n)}{u}" if u == "B" else f"{n:.1f}{u}"
n /= 1024
return f"{n:.1f}GB"
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument(
"--apply", action="store_true",
help="actually delete files (default: dry-run)",
)
ap.add_argument(
"--figs-dir", default=str(_ROOT / "figs"),
help="path to figs/ directory",
)
ap.add_argument(
"--allow-empty", action="store_true",
help="proceed even if Prefect reports zero runs (otherwise we bail "
"to avoid nuking everything when Prefect is unreachable)",
)
args = ap.parse_args()
figs_dir = Path(args.figs_dir).resolve()
if not figs_dir.is_dir():
print(f"no such directory: {figs_dir}", file=sys.stderr)
return 2
try:
live_stems, n_runs = asyncio.run(_fetch_all_live_stems())
except Exception as e:
print(f"could not list Prefect runs ({e})", file=sys.stderr)
return 3
groups = _group_by_stem(figs_dir)
stale: List[Tuple[str, List[Path], int]] = []
reference_kept = 0
matched_kept = 0
for stem, files in sorted(groups.items()):
if _is_reference(stem):
reference_kept += 1
continue
if stem in live_stems:
matched_kept += 1
continue
total = sum(f.stat().st_size for f in files)
stale.append((stem, sorted(files, key=lambda p: p.name), total))
print(f"scanning {figs_dir}")
print(f" {n_runs} flow run(s) live in Prefect")
print(f" {matched_kept} bundle(s) matched to live runs")
print(f" {reference_kept} reference bundle(s) preserved")
print(f" {len(stale)} stale bundle(s) to remove\n")
if not n_runs and stale and not args.allow_empty:
print(
"refusing to proceed: Prefect reported 0 runs. If that's actually "
"correct (e.g. you cleared the whole DB), re-run with --allow-empty.",
file=sys.stderr,
)
return 4
if not stale:
return 0
total_bytes = 0
file_count = 0
for stem, files, size in stale:
total_bytes += size
file_count += len(files)
print(f" {_fmt_size(size):>9} {stem}")
for f in files:
print(f" + {f.name}")
print(f"\n total: {_fmt_size(total_bytes)} across {file_count} file(s)")
if not args.apply:
print("\n(dry run — pass --apply to delete)")
return 0
print("\ndeleting...")
removed = 0
for _, files, _ in stale:
for f in files:
try:
f.unlink()
removed += 1
except OSError as e:
print(f" FAILED {f.name}: {e}")
print(f"done — removed {removed} file(s), freed ~{_fmt_size(total_bytes)}")
return 0
if __name__ == "__main__":
sys.exit(main())