sections' open state from the URL so
# first paint matches (no flash). Intro defaults closed, picker open.
q = request.query_params
intro_open = q.get("intro") == "1"
picker_open = q.get("picker") != "0"
# Also pre-resolve the radio-group selections so n/f/j render with the
# correct `checked` attribute on first paint.
initial_radios = {
"n": q.get("n") or "500",
"f": q.get("f") or "24",
"j": q.get("j") or "0.005",
}
return templates.TemplateResponse(
request,
"index.html",
{
"reducers": reducers,
"default_reducer": default_reducer,
"default_spec": default_spec,
"runs": views,
"deployment_id": dep_id,
"prefect_api": PREFECT_API,
"intro_open": intro_open,
"picker_open": picker_open,
"initial_radios": initial_radios,
},
)
@app.get("/data.json")
async def data_json() -> JSONResponse:
return JSONResponse(_dataset_previews())
@app.get("/reducer-form", response_class=HTMLResponse)
async def reducer_form(request: Request, name: str) -> HTMLResponse:
spec = REDUCERS.get(name)
if not spec:
return HTMLResponse("unknown reducer
", status_code=404)
return templates.TemplateResponse(
request,
"_reducer_form.html",
{"reducer_key": name, "spec": spec},
)
def _chip_filter_tags(params) -> List[str]:
"""Turn chip-filter query params (?dataset=…&algorithm=…&N=…&T=…&J=…)
into a Prefect `tags all_` list. Empty / missing values skip the axis."""
keys = ("dataset", "algorithm", "N", "T", "J")
tags = []
for k in keys:
v = (params.get(k) or "").strip()
if v:
tags.append(f"{k}:{v}")
return tags
@app.get("/runs/axes.json")
async def runs_axes() -> JSONResponse:
"""Distinct chip values across the last N deployment-scoped runs. Lets
the chip bar show the full universe regardless of the current filter."""
async with httpx.AsyncClient(timeout=5.0) as client:
runs = await PREFECT.recent_runs(client, limit=500)
values: Dict[str, set] = {k: set() for k in ("dataset", "algorithm", "N", "T", "J")}
for r in runs:
for tag in r.get("tags") or []:
if ":" not in tag:
continue
k, _, v = tag.partition(":")
if k in values:
values[k].add(v)
# Sort numeric axes numerically.
def _sort(k, vs):
if k in ("N", "T", "J"):
return sorted(vs, key=lambda x: float(x) if x else 0.0)
return sorted(vs)
return JSONResponse({k: _sort(k, v) for k, v in values.items()})
@app.get("/runs", response_class=HTMLResponse)
async def runs_partial(request: Request) -> HTMLResponse:
required = _chip_filter_tags(request.query_params)
# Server-side tag filter → one narrow query per chip state. When any
# axis is unfiltered, Prefect returns the K most recent for that slice;
# when fully filtered, usually a handful of exact matches.
limit = 50 if required else 10
async with httpx.AsyncClient(timeout=5.0) as client:
runs = await PREFECT.recent_runs(client, limit=limit, required_tags=required)
views = [_run_view(r) for r in runs]
_mark_stale_views(views)
return templates.TemplateResponse(
request, "_runs.html", {"runs": views}
)
@app.post("/submit", response_class=HTMLResponse)
async def submit(request: Request) -> HTMLResponse:
form = await request.form()
data: Dict[str, str] = {k: str(v) for k, v in form.items()}
reducer = data.get("reducer") or ""
if reducer not in REDUCERS:
return HTMLResponse(
f"unknown reducer: {reducer}
",
status_code=400,
)
# Dataset came from the picker via dataset_id; fall back to explicit
# generator_path / generator_kwargs only when dataset_id is absent entirely
# (API consumers). UI form posts always carry the key, so an empty value
# means the user hit submit without picking — reject rather than silently
# defaulting to s_curve.
if "dataset_id" in data:
dataset_id = data.get("dataset_id") or ""
if not dataset_id:
return HTMLResponse(
"pick a dataset first (§ 1 above)
",
status_code=400,
)
if dataset_id not in DATASET_META:
return HTMLResponse(
f"unknown dataset: {dataset_id}
",
status_code=400,
)
meta = DATASET_META[dataset_id]
generator_path = meta["path"]
generator_kwargs = dict(meta["kwargs"])
else:
generator_path = data.get("generator_path") or ""
if not generator_path:
return HTMLResponse(
"missing dataset_id or generator_path
",
status_code=400,
)
raw_kwargs = data.get("generator_kwargs") or ""
try:
generator_kwargs = json.loads(raw_kwargs) if raw_kwargs else {}
except json.JSONDecodeError as e:
return HTMLResponse(
f"bad generator_kwargs JSON: {e}
",
status_code=400,
)
try:
num_points = int(data.get("num_points", "5000") or 5000)
num_timesteps = int(data.get("num_timesteps", "48") or 48)
jitter_scale = float(data.get("jitter_scale", "0.01") or 0.01)
seed = int(data.get("seed", "42") or 42)
except ValueError as e:
return HTMLResponse(
f"bad numeric input: {e}
", status_code=400
)
embed_args = build_embed_args(reducer, data)
# Reject submissions whose output path would overwrite an existing fig.
# Hash now covers both embed_args and generator_kwargs, so swiss_roll vs
# swiss_roll_hole (and blobs with varying n_features, etc.) no longer
# share a stem. Also check the legacy hashless path for pre-hash figs.
_, hashed_emb = synthesize_output_paths(
generator_path, reducer, num_points, num_timesteps, jitter_scale, seed,
embed_args=embed_args, generator_kwargs=generator_kwargs,
)
_, legacy_emb = synthesize_output_paths(
generator_path, reducer, num_points, num_timesteps, jitter_scale, seed,
)
for candidate in (hashed_emb, legacy_emb):
if (FIGS_DIR / candidate).exists():
return HTMLResponse(
f"a run with matching params already "
f"exists ({candidate}). change a param or delete "
f"the fig first.
",
status_code=409,
)
parameters: Dict[str, Any] = {
"num_points": num_points,
"num_timesteps": num_timesteps,
"jitter_scale": jitter_scale,
"seed": seed,
"generator_path": generator_path,
"embedder": reducer,
"embed_args": embed_args,
}
if generator_kwargs:
parameters["generator_kwargs"] = generator_kwargs
tags = build_run_tags(
generator_path, generator_kwargs, reducer,
num_points, num_timesteps, jitter_scale,
)
async with httpx.AsyncClient(timeout=10.0) as client:
run = await PREFECT.create_run(client, parameters, tags=tags)
if not run:
return HTMLResponse(
"could not reach Prefect API at "
f"{PREFECT_API}
",
status_code=502,
)
if "error" in run:
return HTMLResponse(
f"prefect error ({run.get('status')}): "
f"{run.get('error')[:500]}
",
status_code=502,
)
ref_file, emb_file = synthesize_output_paths(
generator_path, reducer, num_points, num_timesteps, jitter_scale, seed,
embed_args=embed_args, generator_kwargs=generator_kwargs,
)
RUN_OUTPUTS[run["id"]] = {"ref": ref_file, "embed": emb_file}
# Return freshly refreshed runs partial so htmx can swap the right column
async with httpx.AsyncClient(timeout=5.0) as client:
runs = await PREFECT.recent_runs(client, limit=10)
views = [_run_view(r) for r in runs]
_mark_stale_views(views)
return templates.TemplateResponse(
request,
"_runs.html",
{"runs": views, "just_submitted": run["id"]},
)
def _scan_metrics() -> List[Dict[str, Any]]:
"""Read every `*.metrics.json` in FIGS_DIR and return them as a list."""
out: List[Dict[str, Any]] = []
for p in sorted(FIGS_DIR.glob("*.metrics.json"), key=lambda p: p.stat().st_mtime, reverse=True):
try:
data = json.loads(p.read_text())
except (OSError, json.JSONDecodeError):
continue
data["filename"] = p.name
data["embedding_file"] = p.name.replace(".metrics.json", ".html")
out.append(data)
return out
@app.get("/metrics", response_class=HTMLResponse)
async def metrics_page(request: Request) -> HTMLResponse:
async with httpx.AsyncClient(timeout=5.0) as client:
dep_id = await PREFECT.deployment_id(client)
return templates.TemplateResponse(
request,
"metrics.html",
{"prefect_api": PREFECT_API, "deployment_id": dep_id},
)
@app.get("/metrics.json")
async def metrics_json() -> JSONResponse:
return JSONResponse(_scan_metrics())
_STEM_RE = re.compile(
r"^make_[A-Za-z_]+?_[A-Za-z]+_N\d+_T\d+_J[\d.Ee+\-]+_s\d+(?:_[0-9a-f]{8})?$"
)
# Map short generator name ("make_blobs") to its DATASET_META entry.
# swiss_roll and swiss_roll_hole collide on path; first wins (plain variant).
_GEN_TO_META: Dict[str, Dict[str, Any]] = {}
for _m in DATASET_META.values():
_GEN_TO_META.setdefault(_m["path"].rsplit(".", 1)[-1], _m)
# Kwargs the flow injects / we supply explicitly — never part of the
# dataset's semantic identity, so strip them before DATASET_META matching
# and before regenerating labels.
_TRANSIENT_GEN_KWARGS = {"n_samples", "random_state"}
def _clean_gen_kwargs(gk: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if gk is None:
return None
return {k: v for k, v in gk.items() if k not in _TRANSIENT_GEN_KWARGS}
# Tag axes the chip-filter and backfill both care about. Keep as
# (short_prefix, builder) pairs so adding an axis is a one-line change.
TAG_AXES = ("dataset", "algorithm", "N", "T", "J")
def build_run_tags(
generator_path: str,
generator_kwargs: Optional[Dict[str, Any]],
embedder: str,
num_points: int,
num_timesteps: int,
jitter_scale: float,
) -> List[str]:
"""Tags written onto every flow run so the chip filter can narrow
server-side via Prefect's tag:all_ filter. Single value per axis; the
client's cassette chips pick exactly one per filter."""
return [
f"dataset:{_dataset_id(generator_path, generator_kwargs)}",
f"algorithm:{(embedder or '').rsplit('.', 1)[-1]}",
f"N:{int(num_points)}",
f"T:{int(num_timesteps)}",
f"J:{sci_notation(jitter_scale)}",
]
def _dataset_id(generator_path: str, generator_kwargs: Optional[Dict[str, Any]]) -> str:
"""Human-scale identifier for a run's dataset — e.g. 'swiss_roll' vs
'swiss_roll_hole' — by matching (path, cleaned kwargs) against
DATASET_META. Falls back to the path short-name when no match."""
gen_short = (generator_path or "").rsplit(".", 1)[-1]
gk = _clean_gen_kwargs(generator_kwargs)
candidates = [
(k, m) for k, m in DATASET_META.items()
if m["path"].rsplit(".", 1)[-1] == gen_short
]
if not candidates:
return gen_short
if gk is not None:
for k, m in candidates:
if m["kwargs"] == gk:
return k
return candidates[0][0]
def _lookup_dataset_meta(
generator_short: str, generator_kwargs: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Match DATASET_META by generator short-name AND kwargs when available.
Falls back to first-wins when kwargs are unknown (ambiguous for
swiss_roll vs swiss_roll_hole — both share `make_swiss_roll`)."""
candidates = [
m for m in DATASET_META.values()
if m["path"].rsplit(".", 1)[-1] == generator_short
]
if not candidates:
return None
gk = _clean_gen_kwargs(generator_kwargs)
if gk is not None:
for m in candidates:
if m["kwargs"] == gk:
return m
return candidates[0]
def _enrich_with_labels(d: Dict[str, Any]) -> Dict[str, Any]:
"""Attach per-point class/continuous labels by regenerating the dataset
with the same (generator, n_samples, kwargs). random_state is fixed at 0
(the flow's _DEFAULT_GENERATOR_KWARGS) — the stem's `seed` drives jitter,
not the generator. Jitter-added points (id >= num_points) get None so
the client renders them as black.
Discovers generator_kwargs in priority order: (1) payload meta (sidecar
runs from the updated flow); (2) sibling metrics.json; (3) DATASET_META
by first-match (ambiguous for swiss_roll/swiss_roll_hole — need a
backfilled metrics.json to disambiguate)."""
meta = d.get("meta") or {}
gen_short = meta.get("generator") or ""
gk = meta.get("generator_kwargs")
if gk is None:
stem = meta.get("stem")
if stem:
mx = FIGS_DIR / f"{stem}.metrics.json"
if mx.is_file():
try:
gk = json.loads(mx.read_text(encoding="utf-8")).get(
"meta", {}
).get("generator_kwargs")
except Exception:
gk = None
dm = _lookup_dataset_meta(gen_short, gk)
if not dm:
return d
# Replace the stem-derived generator short (ambiguous for swiss_roll vs
# hole) with the matched DATASET_META id for the panel header.
for key, entry in DATASET_META.items():
if entry is dm:
d["meta"]["generator"] = key
break
kwargs_to_use = _clean_gen_kwargs(gk) if gk is not None else dm["kwargs"]
try:
mod_path, cls_name = dm["path"].rsplit(".", 1)
fn = getattr(importlib.import_module(mod_path), cls_name)
N = int(meta["num_points"])
_, gen_labels = fn(n_samples=N, random_state=0, **kwargs_to_use)
out_labels: List[Optional[float]] = []
for pid in d["point_ids"]:
if isinstance(pid, int) and 0 <= pid < N:
v = gen_labels[pid]
out_labels.append(float(v) if hasattr(v, "item") or isinstance(v, (int, float)) else None)
else:
out_labels.append(None)
d["labels"] = out_labels
d["label_kind"] = dm["kind"]
except Exception:
pass
return d
@lru_cache(maxsize=32)
def _cached_frames(stem: str) -> str:
"""Return the frames dict as a JSON string. Prefers a .frames.json
sidecar (emitted by new flow runs); falls back to parsing .html
(for pre-sidecar runs). Either way, enriches with dataset labels."""
sidecar = FIGS_DIR / f"{stem}.frames.json"
if sidecar.is_file():
d = json.loads(sidecar.read_text(encoding="utf-8"))
else:
html = FIGS_DIR / f"{stem}.html"
d = parse_plotly_run(html)
# Override meta.stem with the URL-requested stem — after a backfill the
# file was renamed but the baked-in meta.stem still points at the old
# name. Enrichment uses this to find the sibling metrics.json.
d.setdefault("meta", {})["stem"] = stem
d = _enrich_with_labels(d)
return json.dumps(d, separators=(",", ":"))
@app.get("/api/runs/{stem}/frames.json")
async def run_frames(stem: str) -> Response:
if not _STEM_RE.match(stem):
raise HTTPException(400, f"malformed stem: {stem!r}")
if not (FIGS_DIR / f"{stem}.frames.json").is_file() and not (FIGS_DIR / f"{stem}.html").is_file():
raise HTTPException(404, f"no such run: {stem}")
try:
payload = _cached_frames(stem)
except Exception as e:
raise HTTPException(500, f"parse failed: {e}")
return Response(
content=payload,
media_type="application/json",
headers={"Cache-Control": "no-cache"},
)
@app.get("/compare", response_class=HTMLResponse)
async def compare_page(request: Request) -> HTMLResponse:
q = request.query_params
stems = [s for s in q.getlist("stem") if s]
if not stems:
# Legacy two-stem form: ?a=&b=
stems = [s for s in (q.get("a", ""), q.get("b", "")) if s]
if not (2 <= len(stems) <= 8):
raise HTTPException(400, f"need 2..8 stems, got {len(stems)}")
for stem in stems:
if not _STEM_RE.match(stem):
raise HTTPException(400, f"malformed stem: {stem!r}")
has_sidecar = (FIGS_DIR / f"{stem}.frames.json").is_file()
has_html = (FIGS_DIR / f"{stem}.html").is_file()
if not (has_sidecar or has_html):
raise HTTPException(404, f"no such run: {stem}")
return templates.TemplateResponse(
request, "compare.html", {"stems": stems}
)
@app.get("/health")
async def health() -> JSONResponse:
async with httpx.AsyncClient(timeout=3.0) as client:
dep = await PREFECT.deployment_id(client)
return JSONResponse(
{"ok": True, "deployment_id": dep, "prefect_api": PREFECT_API}
)