From 47f56b57c85b6b6b6965107b92491d661eab10c7 Mon Sep 17 00:00:00 2001 From: Michael Pilosov Date: Wed, 22 Apr 2026 16:01:49 -0600 Subject: [PATCH] flow: name each Prefect run after its output stem (gen_emb_N_T_J_s_hash) Replaces Prefect's auto adjective-animal names with the same stem that addresses the run's figs on disk, so runs are hoverable/searchable in the Prefect UI by their identifying params. flow_run_name is a callable that reads runtime.flow_run.parameters at scheduling time. --- flows/embedding_flow.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/flows/embedding_flow.py b/flows/embedding_flow.py index dd71725..2d1b1e3 100644 --- a/flows/embedding_flow.py +++ b/flows/embedding_flow.py @@ -33,7 +33,22 @@ def _embed_args_hash(ea: Optional[Dict[str, Any]]) -> str: s = json.dumps(ea or {}, sort_keys=True, default=str) return hashlib.sha1(s.encode()).hexdigest()[:8] -from prefect import flow, task + +def _flow_run_name() -> str: + """Name each Prefect run after the stem of its output fig, so runs are + searchable / hoverable instead of wearing Prefect's auto-generated + adjective-animal names.""" + p = runtime.flow_run.parameters or {} + gen = (p.get("generator_path") or "").rsplit(".", 1)[-1] or "?" + emb = (p.get("embedder") or "").rsplit(".", 1)[-1] or "?" + N = p.get("num_points", "?") + T = p.get("num_timesteps", "?") + J = p.get("jitter_scale", "?") + s = p.get("seed", "?") + tag = _embed_args_hash(p.get("embed_args")) + return f"{gen}_{emb}_N{N}_T{T}_J{J}_s{s}_{tag}" + +from prefect import flow, runtime, task from prefect.artifacts import create_markdown_artifact, create_table_artifact from prefect.cache_policies import INPUTS, NO_CACHE from prefect_ray import RayTaskRunner @@ -254,7 +269,7 @@ _DEFAULT_EMBED_COLUMNS: List[str] = ["feature_0", "feature_2", "feature_1"] _DEFAULT_EMBED_ARGS: Dict[str, Any] = {"n_components": 2, "random_state": 30} -@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 4})) +@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 4}), flow_run_name=_flow_run_name) def embedding_flow( num_points: int = 5000, num_timesteps: int = 48,