runs: server-side chip filter via Prefect tags + cassette chip UX

- New runs are tagged on dispatch with dataset:<id> / algorithm:<short> /
  N:<n> / T:<t> / J:<j> (single value per axis).
- /runs accepts ?dataset=&algorithm=&N=&T=&J= and applies Prefect's
  tags: {all_: [...]} server-side. Without filter, fetch cap is 10; with
  filter, 50 so narrow results aren't truncated. Prefect's own 200-limit
  on filter queries is clamped inside recent_runs.
- New /runs/axes.json returns the universe of chip values across the last
  200 deployment runs so the chip bar shows history even when the current
  slice is narrow.
- runs-filter.js rewritten to cassette-style single-select: clicking the
  selected chip releases it. No 'all'/'none' meta chips. Chip state feeds
  #runs-slot via hx-vals; a filter-changed custom event triggers an
  immediate refetch on change, in addition to the 3s poll.
- Prefect client gets an update_tags(run_id, tags) helper.
- scripts/backfill_tags.py PATCHes tags onto every existing deployment
  run (dry-run by default, --apply to commit).
This commit is contained in:
Michael Pilosov 2026-04-22 17:48:35 -06:00
parent aa1303e373
commit 56279dbb1b
4 changed files with 267 additions and 86 deletions

View File

@ -534,32 +534,46 @@ class Prefect:
return None return None
async def create_run( async def create_run(
self, client: httpx.AsyncClient, parameters: Dict[str, Any] self,
client: httpx.AsyncClient,
parameters: Dict[str, Any],
tags: Optional[List[str]] = None,
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
dep = await self.deployment_id(client) dep = await self.deployment_id(client)
if not dep: if not dep:
return None return None
body: Dict[str, Any] = {"parameters": parameters}
if tags:
body["tags"] = list(tags)
r = await client.post( r = await client.post(
f"{self.base}/deployments/{dep}/create_flow_run", f"{self.base}/deployments/{dep}/create_flow_run",
json={"parameters": parameters}, json=body,
) )
if r.status_code >= 400: if r.status_code >= 400:
return {"error": r.text, "status": r.status_code} return {"error": r.text, "status": r.status_code}
return r.json() return r.json()
async def recent_runs( async def recent_runs(
self, client: httpx.AsyncClient, limit: int = 10 self,
client: httpx.AsyncClient,
limit: int = 10,
required_tags: Optional[List[str]] = None,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
dep = await self.deployment_id(client) dep = await self.deployment_id(client)
if not dep: if not dep:
return [] return []
flow_runs: Dict[str, Any] = {"deployment_id": {"any_": [dep]}}
if required_tags:
flow_runs["tags"] = {"all_": list(required_tags)}
# Prefect rejects limit > 200 with HTTP 422.
capped = min(max(1, limit), 200)
try: try:
r = await client.post( r = await client.post(
f"{self.base}/flow_runs/filter", f"{self.base}/flow_runs/filter",
json={ json={
"sort": "START_TIME_DESC", "sort": "START_TIME_DESC",
"limit": limit, "limit": capped,
"flow_runs": {"deployment_id": {"any_": [dep]}}, "flow_runs": flow_runs,
}, },
) )
if r.status_code == 200: if r.status_code == 200:
@ -568,6 +582,19 @@ class Prefect:
return [] return []
return [] return []
async def update_tags(
self, client: httpx.AsyncClient, run_id: str, tags: List[str]
) -> bool:
try:
r = await client.patch(
f"{self.base}/flow_runs/{run_id}",
json={"tags": list(tags)},
)
return r.status_code < 400
except httpx.HTTPError:
return False
return []
PREFECT = Prefect() PREFECT = Prefect()
@ -738,10 +765,49 @@ async def reducer_form(request: Request, name: str) -> HTMLResponse:
) )
def _chip_filter_tags(params) -> List[str]:
"""Turn chip-filter query params (?dataset=…&algorithm=…&N=…&T=…&J=…)
into a Prefect `tags all_` list. Empty / missing values skip the axis."""
keys = ("dataset", "algorithm", "N", "T", "J")
tags = []
for k in keys:
v = (params.get(k) or "").strip()
if v:
tags.append(f"{k}:{v}")
return tags
@app.get("/runs/axes.json")
async def runs_axes() -> JSONResponse:
"""Distinct chip values across the last N deployment-scoped runs. Lets
the chip bar show the full universe regardless of the current filter."""
async with httpx.AsyncClient(timeout=5.0) as client:
runs = await PREFECT.recent_runs(client, limit=500)
values: Dict[str, set] = {k: set() for k in ("dataset", "algorithm", "N", "T", "J")}
for r in runs:
for tag in r.get("tags") or []:
if ":" not in tag:
continue
k, _, v = tag.partition(":")
if k in values:
values[k].add(v)
# Sort numeric axes numerically.
def _sort(k, vs):
if k in ("N", "T", "J"):
return sorted(vs, key=lambda x: float(x) if x else 0.0)
return sorted(vs)
return JSONResponse({k: _sort(k, v) for k, v in values.items()})
@app.get("/runs", response_class=HTMLResponse) @app.get("/runs", response_class=HTMLResponse)
async def runs_partial(request: Request) -> HTMLResponse: async def runs_partial(request: Request) -> HTMLResponse:
required = _chip_filter_tags(request.query_params)
# Server-side tag filter → one narrow query per chip state. When any
# axis is unfiltered, Prefect returns the K most recent for that slice;
# when fully filtered, usually a handful of exact matches.
limit = 50 if required else 10
async with httpx.AsyncClient(timeout=5.0) as client: async with httpx.AsyncClient(timeout=5.0) as client:
runs = await PREFECT.recent_runs(client, limit=10) runs = await PREFECT.recent_runs(client, limit=limit, required_tags=required)
views = [_run_view(r) for r in runs] views = [_run_view(r) for r in runs]
_mark_stale_views(views) _mark_stale_views(views)
return templates.TemplateResponse( return templates.TemplateResponse(
@ -841,8 +907,13 @@ async def submit(request: Request) -> HTMLResponse:
if generator_kwargs: if generator_kwargs:
parameters["generator_kwargs"] = generator_kwargs parameters["generator_kwargs"] = generator_kwargs
tags = build_run_tags(
generator_path, generator_kwargs, reducer,
num_points, num_timesteps, jitter_scale,
)
async with httpx.AsyncClient(timeout=10.0) as client: async with httpx.AsyncClient(timeout=10.0) as client:
run = await PREFECT.create_run(client, parameters) run = await PREFECT.create_run(client, parameters, tags=tags)
if not run: if not run:
return HTMLResponse( return HTMLResponse(
@ -928,6 +999,31 @@ def _clean_gen_kwargs(gk: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
return {k: v for k, v in gk.items() if k not in _TRANSIENT_GEN_KWARGS} return {k: v for k, v in gk.items() if k not in _TRANSIENT_GEN_KWARGS}
# Tag axes the chip-filter and backfill both care about. Keep as
# (short_prefix, builder) pairs so adding an axis is a one-line change.
TAG_AXES = ("dataset", "algorithm", "N", "T", "J")
def build_run_tags(
generator_path: str,
generator_kwargs: Optional[Dict[str, Any]],
embedder: str,
num_points: int,
num_timesteps: int,
jitter_scale: float,
) -> List[str]:
"""Tags written onto every flow run so the chip filter can narrow
server-side via Prefect's tag:all_ filter. Single value per axis; the
client's cassette chips pick exactly one per filter."""
return [
f"dataset:{_dataset_id(generator_path, generator_kwargs)}",
f"algorithm:{(embedder or '').rsplit('.', 1)[-1]}",
f"N:{int(num_points)}",
f"T:{int(num_timesteps)}",
f"J:{jitter_scale}",
]
def _dataset_id(generator_path: str, generator_kwargs: Optional[Dict[str, Any]]) -> str: def _dataset_id(generator_path: str, generator_kwargs: Optional[Dict[str, Any]]) -> str:
"""Human-scale identifier for a run's dataset — e.g. 'swiss_roll' vs """Human-scale identifier for a run's dataset — e.g. 'swiss_roll' vs
'swiss_roll_hole' by matching (path, cleaned kwargs) against 'swiss_roll_hole' by matching (path, cleaned kwargs) against

View File

@ -1,91 +1,82 @@
// Filter the recent-runs list by chip-groups. State lives outside // Cassette-style single-select chip filter. Each axis has at most one
// #runs-slot so it survives the 3s htmx poll. After each swap we repopulate // selection; clicking the selected chip again releases it. State lives
// chips from whatever came back, then re-apply selection to hide rows. // here and rides along on the htmx-polled /runs request via hx-vals on
// #runs-slot. Chip universe comes from /runs/axes.json so the bar shows
// the full history, not just the currently-displayed page.
(function () { (function () {
const slot = document.getElementById('runs-slot'); const slot = document.getElementById('runs-slot');
if (!slot) return; if (!slot) return;
const AXES = [ const AXES = [
{ prop: 'generator', chipsId: 'runs-flt-dataset', numeric: false }, { key: 'dataset', chipsId: 'runs-flt-dataset', numeric: false },
{ prop: 'embedder', chipsId: 'runs-flt-algo', numeric: false }, { key: 'algorithm', chipsId: 'runs-flt-algo', numeric: false },
{ prop: 'n', chipsId: 'runs-flt-n', numeric: true }, { key: 'N', chipsId: 'runs-flt-n', numeric: true },
{ prop: 't', chipsId: 'runs-flt-t', numeric: true }, { key: 'T', chipsId: 'runs-flt-t', numeric: true },
{ prop: 'j', chipsId: 'runs-flt-j', numeric: true }, { key: 'J', chipsId: 'runs-flt-j', numeric: true },
]; ];
for (const ax of AXES) { for (const ax of AXES) {
ax.el = document.getElementById(ax.chipsId); ax.el = document.getElementById(ax.chipsId);
ax.group = ax.el ? ax.el.closest('.runs-filter-group') : null; ax.group = ax.el ? ax.el.closest('.runs-filter-group') : null;
ax.selected = null; // null = all ax.selected = null;
ax.universe = [];
} }
function scanAll() { function stateAsQuery() {
const out = new Map(AXES.map(a => [a.prop, new Set()])); const p = {};
slot.querySelectorAll('li.run').forEach((li) => {
for (const ax of AXES) { for (const ax of AXES) {
const v = li.dataset[ax.prop]; if (ax.selected != null) p[ax.key] = ax.selected;
if (v) out.get(ax.prop).add(v);
} }
}); return p;
return out;
} }
function sortValues(vals, numeric) { function syncHtmxVals() {
const arr = [...vals]; // Feed the current chip state into every htmx request on #runs-slot
if (numeric) arr.sort((a, b) => Number(a) - Number(b)); // (including the 3s poll). JSON form.
else arr.sort(); slot.setAttribute('hx-vals', JSON.stringify(stateAsQuery()));
return arr;
} }
function paint(container, values, selected) { function paint(ax) {
container.innerHTML = ''; if (!ax.el) return;
ax.el.innerHTML = '';
const values = ax.universe;
if (ax.group) ax.group.style.display = values.length <= 1 ? 'none' : '';
for (const v of values) { for (const v of values) {
const b = document.createElement('button'); const b = document.createElement('button');
b.type = 'button'; b.type = 'button';
const on = selected == null || selected.has(v); const on = ax.selected === v;
b.className = 'chip' + (on ? ' is-on' : ''); b.className = 'chip' + (on ? ' is-on' : '');
b.dataset.value = v; b.dataset.value = v;
b.dataset.role = 'value';
b.setAttribute('aria-pressed', on ? 'true' : 'false'); b.setAttribute('aria-pressed', on ? 'true' : 'false');
b.textContent = v; b.textContent = v;
container.appendChild(b); ax.el.appendChild(b);
}
// Keep all/none atomic so they wrap together rather than orphaning.
const metaWrap = document.createElement('span');
metaWrap.className = 'chip-meta-wrap';
for (const [role, label] of [['all', 'all'], ['none', 'none']]) {
const b = document.createElement('button');
b.type = 'button';
b.className = 'chip chip-meta';
b.dataset.role = role;
b.textContent = label;
metaWrap.appendChild(b);
}
container.appendChild(metaWrap);
}
function repaint() {
const scanned = scanAll();
for (const ax of AXES) {
if (!ax.el) continue;
const values = sortValues(scanned.get(ax.prop), ax.numeric);
// Hide the whole group when there's nothing to filter by.
if (ax.group) ax.group.style.display = values.length <= 1 ? 'none' : '';
paint(ax.el, values, ax.selected);
} }
} }
function apply() { function repaintAll() {
slot.querySelectorAll('li.run').forEach((li) => { for (const ax of AXES) paint(ax);
let pass = true; }
for (const ax of AXES) {
if (ax.selected == null) continue; async function refreshUniverse() {
const v = li.dataset[ax.prop] || ''; try {
if (!ax.selected.has(v)) { pass = false; break; } const res = await fetch('/runs/axes.json', { cache: 'no-store' });
if (!res.ok) return;
const data = await res.json();
for (const ax of AXES) {
ax.universe = Array.isArray(data[ax.key]) ? data[ax.key] : [];
}
repaintAll();
} catch {
/* offline → leave whatever we had */
}
}
function triggerRunsRefresh() {
// Tell htmx to re-fetch /runs right now with the updated hx-vals.
if (window.htmx && typeof window.htmx.trigger === 'function') {
window.htmx.trigger(slot, 'filter-changed');
} }
li.classList.toggle('filtered-out', !pass);
});
} }
for (const ax of AXES) { for (const ax of AXES) {
@ -93,30 +84,26 @@
ax.el.addEventListener('click', (e) => { ax.el.addEventListener('click', (e) => {
const btn = e.target.closest('.chip'); const btn = e.target.closest('.chip');
if (!btn) return; if (!btn) return;
const role = btn.dataset.role;
const all = sortValues(scanAll().get(ax.prop), ax.numeric);
let cur = ax.selected == null ? new Set(all) : ax.selected;
if (role === 'value') {
const v = btn.dataset.value; const v = btn.dataset.value;
if (cur.has(v)) cur.delete(v); else cur.add(v); ax.selected = (ax.selected === v) ? null : v;
} else if (role === 'all') { paint(ax);
cur = new Set(all); syncHtmxVals();
} else if (role === 'none') { triggerRunsRefresh();
cur = new Set();
}
ax.selected = cur;
repaint();
apply();
}); });
} }
// Re-paint on htmx swap (fresh runs arriving) so the chip universe stays
// current even between explicit refreshes.
document.body.addEventListener('htmx:afterSwap', (e) => { document.body.addEventListener('htmx:afterSwap', (e) => {
if (e.target && e.target.id === 'runs-slot') { if (e.target && e.target.id === 'runs-slot') {
repaint(); // Nothing to re-apply in the DOM — the server already honored the
apply(); // filter. We just make sure selected chips stay marked.
repaintAll();
} }
}); });
repaint(); syncHtmxVals();
apply(); refreshUniverse();
// Periodically refresh the universe so newly-introduced values appear.
setInterval(refreshUniverse, 30_000);
})(); })();

View File

@ -338,7 +338,7 @@
<div <div
id="runs-slot" id="runs-slot"
hx-get="/runs" hx-get="/runs"
hx-trigger="load delay:3s, every 3s" hx-trigger="load delay:3s, every 3s, filter-changed"
hx-swap="innerHTML" hx-swap="innerHTML"
hx-indicator="#poll-ind" hx-indicator="#poll-ind"
> >
@ -501,7 +501,7 @@
<script type="module" src="/static/dataset-picker.js?v=11"></script> <script type="module" src="/static/dataset-picker.js?v=11"></script>
<script type="module" src="/static/metrics.js?v=11"></script> <script type="module" src="/static/metrics.js?v=11"></script>
<script src="/static/compare-select.js?v=2"></script> <script src="/static/compare-select.js?v=2"></script>
<script src="/static/runs-filter.js?v=4"></script> <script src="/static/runs-filter.js?v=5"></script>
<script type="module" src="/static/run-modal.js?v=3"></script> <script type="module" src="/static/run-modal.js?v=3"></script>
<script> <script>
// Anchor-links alone don't expand <details>; force it. // Anchor-links alone don't expand <details>; force it.

98
scripts/backfill_tags.py Normal file
View File

@ -0,0 +1,98 @@
"""Backfill per-run Prefect tags for the chip-filter UX.
Each run in the deployment is tagged with
dataset:<id> algorithm:<short> N:<n> T:<t> J:<j>
computed from its stored `parameters`. Existing tags on the run are
preserved; the five axis tags are merged in (replacing any stale value).
Dry-run by default. Pass `--apply` to actually PATCH runs.
Usage:
.venv/bin/python scripts/backfill_tags.py [--apply] [--limit N]
"""
from __future__ import annotations
import argparse
import asyncio
import sys
from pathlib import Path
from typing import Any, Dict, List
_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_ROOT))
import httpx # noqa: E402
from app.web.main import PREFECT, TAG_AXES, build_run_tags # noqa: E402
def _desired_tags(params: Dict[str, Any]) -> List[str]:
return build_run_tags(
params.get("generator_path") or "",
params.get("generator_kwargs") or {},
params.get("embedder") or "",
int(params.get("num_points", 0) or 0),
int(params.get("num_timesteps", params.get("num_snapshots", 0)) or 0),
float(params.get("jitter_scale", 0.0) or 0.0),
)
def _merge(existing: List[str], desired: List[str]) -> List[str]:
"""Replace any existing `<axis>:*` tags with the desired ones; keep
anything else untouched."""
prefixes = tuple(f"{k}:" for k in TAG_AXES)
kept = [t for t in (existing or []) if not t.startswith(prefixes)]
return kept + list(desired)
async def main_async(apply: bool, limit: int) -> int:
async with httpx.AsyncClient(timeout=10.0) as c:
runs = await PREFECT.recent_runs(c, limit=limit)
planned = []
for r in runs:
params = r.get("parameters") or {}
try:
desired = _desired_tags(params)
except Exception as e:
print(f" skip {r['id'][:8]} ({e})")
continue
existing = r.get("tags") or []
merged = _merge(existing, desired)
if set(merged) == set(existing):
continue
planned.append((r["id"], existing, merged))
print(f"scanning deployment runs (seen: {len(runs)})")
print(f" {len(planned)} to patch\n")
for rid, _, merged in planned:
print(f" {rid[:8]} -> {sorted(merged)}")
if not planned:
print("nothing to do")
return 0
if not apply:
print("\n(dry run — pass --apply to patch)")
return 0
print("\napplying...")
ok = 0
for rid, _, merged in planned:
if await PREFECT.update_tags(c, rid, merged):
ok += 1
print(f" {rid[:8]} OK")
else:
print(f" {rid[:8]} FAILED")
print(f"done — patched {ok}/{len(planned)}")
return 0
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--apply", action="store_true", help="actually PATCH tags (default: dry-run)")
ap.add_argument("--limit", type=int, default=500, help="Prefect runs to scan")
args = ap.parse_args()
return asyncio.run(main_async(args.apply, args.limit))
if __name__ == "__main__":
sys.exit(main())