flow: name each Prefect run after its output stem (gen_emb_N_T_J_s_hash)

Replaces Prefect's auto adjective-animal names with the same stem that
addresses the run's figs on disk, so runs are hoverable/searchable in the
Prefect UI by their identifying params. flow_run_name is a callable that
reads runtime.flow_run.parameters at scheduling time.
This commit is contained in:
Michael Pilosov 2026-04-22 16:01:49 -06:00
parent a1d242ae36
commit 47f56b57c8

View File

@ -33,7 +33,22 @@ def _embed_args_hash(ea: Optional[Dict[str, Any]]) -> str:
s = json.dumps(ea or {}, sort_keys=True, default=str)
return hashlib.sha1(s.encode()).hexdigest()[:8]
from prefect import flow, task
def _flow_run_name() -> str:
"""Name each Prefect run after the stem of its output fig, so runs are
searchable / hoverable instead of wearing Prefect's auto-generated
adjective-animal names."""
p = runtime.flow_run.parameters or {}
gen = (p.get("generator_path") or "").rsplit(".", 1)[-1] or "?"
emb = (p.get("embedder") or "").rsplit(".", 1)[-1] or "?"
N = p.get("num_points", "?")
T = p.get("num_timesteps", "?")
J = p.get("jitter_scale", "?")
s = p.get("seed", "?")
tag = _embed_args_hash(p.get("embed_args"))
return f"{gen}_{emb}_N{N}_T{T}_J{J}_s{s}_{tag}"
from prefect import flow, runtime, task
from prefect.artifacts import create_markdown_artifact, create_table_artifact
from prefect.cache_policies import INPUTS, NO_CACHE
from prefect_ray import RayTaskRunner
@ -254,7 +269,7 @@ _DEFAULT_EMBED_COLUMNS: List[str] = ["feature_0", "feature_2", "feature_1"]
_DEFAULT_EMBED_ARGS: Dict[str, Any] = {"n_components": 2, "random_state": 30}
@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 4}))
@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 4}), flow_run_name=_flow_run_name)
def embedding_flow(
num_points: int = 5000,
num_timesteps: int = 48,