From 56279dbb1b6506ff3798e2234283378b2dbff4c1 Mon Sep 17 00:00:00 2001 From: Michael Pilosov Date: Wed, 22 Apr 2026 17:48:35 -0600 Subject: [PATCH] runs: server-side chip filter via Prefect tags + cassette chip UX - New runs are tagged on dispatch with dataset: / algorithm: / N: / T: / J: (single value per axis). - /runs accepts ?dataset=&algorithm=&N=&T=&J= and applies Prefect's tags: {all_: [...]} server-side. Without filter, fetch cap is 10; with filter, 50 so narrow results aren't truncated. Prefect's own 200-limit on filter queries is clamped inside recent_runs. - New /runs/axes.json returns the universe of chip values across the last 200 deployment runs so the chip bar shows history even when the current slice is narrow. - runs-filter.js rewritten to cassette-style single-select: clicking the selected chip releases it. No 'all'/'none' meta chips. Chip state feeds #runs-slot via hx-vals; a filter-changed custom event triggers an immediate refetch on change, in addition to the 3s poll. - Prefect client gets an update_tags(run_id, tags) helper. - scripts/backfill_tags.py PATCHes tags onto every existing deployment run (dry-run by default, --apply to commit). --- app/web/main.py | 110 ++++++++++++++++++++++++-- app/web/static/runs-filter.js | 141 +++++++++++++++------------------- app/web/templates/index.html | 4 +- scripts/backfill_tags.py | 98 +++++++++++++++++++++++ 4 files changed, 267 insertions(+), 86 deletions(-) create mode 100644 scripts/backfill_tags.py diff --git a/app/web/main.py b/app/web/main.py index 3e668ac..88cbf83 100644 --- a/app/web/main.py +++ b/app/web/main.py @@ -534,32 +534,46 @@ class Prefect: return None async def create_run( - self, client: httpx.AsyncClient, parameters: Dict[str, Any] + self, + client: httpx.AsyncClient, + parameters: Dict[str, Any], + tags: Optional[List[str]] = None, ) -> Optional[Dict[str, Any]]: dep = await self.deployment_id(client) if not dep: return None + body: Dict[str, Any] = {"parameters": parameters} + if tags: + body["tags"] = list(tags) r = await client.post( f"{self.base}/deployments/{dep}/create_flow_run", - json={"parameters": parameters}, + json=body, ) if r.status_code >= 400: return {"error": r.text, "status": r.status_code} return r.json() async def recent_runs( - self, client: httpx.AsyncClient, limit: int = 10 + self, + client: httpx.AsyncClient, + limit: int = 10, + required_tags: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: dep = await self.deployment_id(client) if not dep: return [] + flow_runs: Dict[str, Any] = {"deployment_id": {"any_": [dep]}} + if required_tags: + flow_runs["tags"] = {"all_": list(required_tags)} + # Prefect rejects limit > 200 with HTTP 422. + capped = min(max(1, limit), 200) try: r = await client.post( f"{self.base}/flow_runs/filter", json={ "sort": "START_TIME_DESC", - "limit": limit, - "flow_runs": {"deployment_id": {"any_": [dep]}}, + "limit": capped, + "flow_runs": flow_runs, }, ) if r.status_code == 200: @@ -568,6 +582,19 @@ class Prefect: return [] return [] + async def update_tags( + self, client: httpx.AsyncClient, run_id: str, tags: List[str] + ) -> bool: + try: + r = await client.patch( + f"{self.base}/flow_runs/{run_id}", + json={"tags": list(tags)}, + ) + return r.status_code < 400 + except httpx.HTTPError: + return False + return [] + PREFECT = Prefect() @@ -738,10 +765,49 @@ async def reducer_form(request: Request, name: str) -> HTMLResponse: ) +def _chip_filter_tags(params) -> List[str]: + """Turn chip-filter query params (?dataset=…&algorithm=…&N=…&T=…&J=…) + into a Prefect `tags all_` list. Empty / missing values skip the axis.""" + keys = ("dataset", "algorithm", "N", "T", "J") + tags = [] + for k in keys: + v = (params.get(k) or "").strip() + if v: + tags.append(f"{k}:{v}") + return tags + + +@app.get("/runs/axes.json") +async def runs_axes() -> JSONResponse: + """Distinct chip values across the last N deployment-scoped runs. Lets + the chip bar show the full universe regardless of the current filter.""" + async with httpx.AsyncClient(timeout=5.0) as client: + runs = await PREFECT.recent_runs(client, limit=500) + values: Dict[str, set] = {k: set() for k in ("dataset", "algorithm", "N", "T", "J")} + for r in runs: + for tag in r.get("tags") or []: + if ":" not in tag: + continue + k, _, v = tag.partition(":") + if k in values: + values[k].add(v) + # Sort numeric axes numerically. + def _sort(k, vs): + if k in ("N", "T", "J"): + return sorted(vs, key=lambda x: float(x) if x else 0.0) + return sorted(vs) + return JSONResponse({k: _sort(k, v) for k, v in values.items()}) + + @app.get("/runs", response_class=HTMLResponse) async def runs_partial(request: Request) -> HTMLResponse: + required = _chip_filter_tags(request.query_params) + # Server-side tag filter → one narrow query per chip state. When any + # axis is unfiltered, Prefect returns the K most recent for that slice; + # when fully filtered, usually a handful of exact matches. + limit = 50 if required else 10 async with httpx.AsyncClient(timeout=5.0) as client: - runs = await PREFECT.recent_runs(client, limit=10) + runs = await PREFECT.recent_runs(client, limit=limit, required_tags=required) views = [_run_view(r) for r in runs] _mark_stale_views(views) return templates.TemplateResponse( @@ -841,8 +907,13 @@ async def submit(request: Request) -> HTMLResponse: if generator_kwargs: parameters["generator_kwargs"] = generator_kwargs + tags = build_run_tags( + generator_path, generator_kwargs, reducer, + num_points, num_timesteps, jitter_scale, + ) + async with httpx.AsyncClient(timeout=10.0) as client: - run = await PREFECT.create_run(client, parameters) + run = await PREFECT.create_run(client, parameters, tags=tags) if not run: return HTMLResponse( @@ -928,6 +999,31 @@ def _clean_gen_kwargs(gk: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: return {k: v for k, v in gk.items() if k not in _TRANSIENT_GEN_KWARGS} +# Tag axes the chip-filter and backfill both care about. Keep as +# (short_prefix, builder) pairs so adding an axis is a one-line change. +TAG_AXES = ("dataset", "algorithm", "N", "T", "J") + + +def build_run_tags( + generator_path: str, + generator_kwargs: Optional[Dict[str, Any]], + embedder: str, + num_points: int, + num_timesteps: int, + jitter_scale: float, +) -> List[str]: + """Tags written onto every flow run so the chip filter can narrow + server-side via Prefect's tag:all_ filter. Single value per axis; the + client's cassette chips pick exactly one per filter.""" + return [ + f"dataset:{_dataset_id(generator_path, generator_kwargs)}", + f"algorithm:{(embedder or '').rsplit('.', 1)[-1]}", + f"N:{int(num_points)}", + f"T:{int(num_timesteps)}", + f"J:{jitter_scale}", + ] + + def _dataset_id(generator_path: str, generator_kwargs: Optional[Dict[str, Any]]) -> str: """Human-scale identifier for a run's dataset — e.g. 'swiss_roll' vs 'swiss_roll_hole' — by matching (path, cleaned kwargs) against diff --git a/app/web/static/runs-filter.js b/app/web/static/runs-filter.js index e4e0988..1786e55 100644 --- a/app/web/static/runs-filter.js +++ b/app/web/static/runs-filter.js @@ -1,91 +1,82 @@ -// Filter the recent-runs list by chip-groups. State lives outside -// #runs-slot so it survives the 3s htmx poll. After each swap we repopulate -// chips from whatever came back, then re-apply selection to hide rows. +// Cassette-style single-select chip filter. Each axis has at most one +// selection; clicking the selected chip again releases it. State lives +// here and rides along on the htmx-polled /runs request via hx-vals on +// #runs-slot. Chip universe comes from /runs/axes.json so the bar shows +// the full history, not just the currently-displayed page. (function () { const slot = document.getElementById('runs-slot'); if (!slot) return; const AXES = [ - { prop: 'generator', chipsId: 'runs-flt-dataset', numeric: false }, - { prop: 'embedder', chipsId: 'runs-flt-algo', numeric: false }, - { prop: 'n', chipsId: 'runs-flt-n', numeric: true }, - { prop: 't', chipsId: 'runs-flt-t', numeric: true }, - { prop: 'j', chipsId: 'runs-flt-j', numeric: true }, + { key: 'dataset', chipsId: 'runs-flt-dataset', numeric: false }, + { key: 'algorithm', chipsId: 'runs-flt-algo', numeric: false }, + { key: 'N', chipsId: 'runs-flt-n', numeric: true }, + { key: 'T', chipsId: 'runs-flt-t', numeric: true }, + { key: 'J', chipsId: 'runs-flt-j', numeric: true }, ]; for (const ax of AXES) { ax.el = document.getElementById(ax.chipsId); ax.group = ax.el ? ax.el.closest('.runs-filter-group') : null; - ax.selected = null; // null = all + ax.selected = null; + ax.universe = []; } - function scanAll() { - const out = new Map(AXES.map(a => [a.prop, new Set()])); - slot.querySelectorAll('li.run').forEach((li) => { - for (const ax of AXES) { - const v = li.dataset[ax.prop]; - if (v) out.get(ax.prop).add(v); - } - }); - return out; + function stateAsQuery() { + const p = {}; + for (const ax of AXES) { + if (ax.selected != null) p[ax.key] = ax.selected; + } + return p; } - function sortValues(vals, numeric) { - const arr = [...vals]; - if (numeric) arr.sort((a, b) => Number(a) - Number(b)); - else arr.sort(); - return arr; + function syncHtmxVals() { + // Feed the current chip state into every htmx request on #runs-slot + // (including the 3s poll). JSON form. + slot.setAttribute('hx-vals', JSON.stringify(stateAsQuery())); } - function paint(container, values, selected) { - container.innerHTML = ''; + function paint(ax) { + if (!ax.el) return; + ax.el.innerHTML = ''; + const values = ax.universe; + if (ax.group) ax.group.style.display = values.length <= 1 ? 'none' : ''; for (const v of values) { const b = document.createElement('button'); b.type = 'button'; - const on = selected == null || selected.has(v); + const on = ax.selected === v; b.className = 'chip' + (on ? ' is-on' : ''); b.dataset.value = v; - b.dataset.role = 'value'; b.setAttribute('aria-pressed', on ? 'true' : 'false'); b.textContent = v; - container.appendChild(b); - } - // Keep all/none atomic so they wrap together rather than orphaning. - const metaWrap = document.createElement('span'); - metaWrap.className = 'chip-meta-wrap'; - for (const [role, label] of [['all', 'all'], ['none', 'none']]) { - const b = document.createElement('button'); - b.type = 'button'; - b.className = 'chip chip-meta'; - b.dataset.role = role; - b.textContent = label; - metaWrap.appendChild(b); - } - container.appendChild(metaWrap); - } - - function repaint() { - const scanned = scanAll(); - for (const ax of AXES) { - if (!ax.el) continue; - const values = sortValues(scanned.get(ax.prop), ax.numeric); - // Hide the whole group when there's nothing to filter by. - if (ax.group) ax.group.style.display = values.length <= 1 ? 'none' : ''; - paint(ax.el, values, ax.selected); + ax.el.appendChild(b); } } - function apply() { - slot.querySelectorAll('li.run').forEach((li) => { - let pass = true; + function repaintAll() { + for (const ax of AXES) paint(ax); + } + + async function refreshUniverse() { + try { + const res = await fetch('/runs/axes.json', { cache: 'no-store' }); + if (!res.ok) return; + const data = await res.json(); for (const ax of AXES) { - if (ax.selected == null) continue; - const v = li.dataset[ax.prop] || ''; - if (!ax.selected.has(v)) { pass = false; break; } + ax.universe = Array.isArray(data[ax.key]) ? data[ax.key] : []; } - li.classList.toggle('filtered-out', !pass); - }); + repaintAll(); + } catch { + /* offline → leave whatever we had */ + } + } + + function triggerRunsRefresh() { + // Tell htmx to re-fetch /runs right now with the updated hx-vals. + if (window.htmx && typeof window.htmx.trigger === 'function') { + window.htmx.trigger(slot, 'filter-changed'); + } } for (const ax of AXES) { @@ -93,30 +84,26 @@ ax.el.addEventListener('click', (e) => { const btn = e.target.closest('.chip'); if (!btn) return; - const role = btn.dataset.role; - const all = sortValues(scanAll().get(ax.prop), ax.numeric); - let cur = ax.selected == null ? new Set(all) : ax.selected; - if (role === 'value') { - const v = btn.dataset.value; - if (cur.has(v)) cur.delete(v); else cur.add(v); - } else if (role === 'all') { - cur = new Set(all); - } else if (role === 'none') { - cur = new Set(); - } - ax.selected = cur; - repaint(); - apply(); + const v = btn.dataset.value; + ax.selected = (ax.selected === v) ? null : v; + paint(ax); + syncHtmxVals(); + triggerRunsRefresh(); }); } + // Re-paint on htmx swap (fresh runs arriving) so the chip universe stays + // current even between explicit refreshes. document.body.addEventListener('htmx:afterSwap', (e) => { if (e.target && e.target.id === 'runs-slot') { - repaint(); - apply(); + // Nothing to re-apply in the DOM — the server already honored the + // filter. We just make sure selected chips stay marked. + repaintAll(); } }); - repaint(); - apply(); + syncHtmxVals(); + refreshUniverse(); + // Periodically refresh the universe so newly-introduced values appear. + setInterval(refreshUniverse, 30_000); })(); diff --git a/app/web/templates/index.html b/app/web/templates/index.html index b09bf1c..1e47d0c 100644 --- a/app/web/templates/index.html +++ b/app/web/templates/index.html @@ -338,7 +338,7 @@
@@ -501,7 +501,7 @@ - +