dispatch — multi-box run-spec stem dispatcher
Purpose
Fans a list of run-spec stems across a 1-Mac + 2-Linux personal cluster: weighted static
partition → SSH/tmux detached launch → polling state machine → atomic collection → journal.
Refactor-extract fromvalidation/legacy/corridor_sweep.py(NOT greenfield); the remote-execution
primitive is detached_run.
Role in the system
- The cluster front-end: it splits different stems across boxes for throughput (contrast
tolerance, which replicates the same stem to bound cross-machine disagreement). - Each leg shells out to detached_run (
run/status/list) — over SSH on the Linux boxes,
in-process import on the local Mac. - A launched stem ultimately runs
run_spec.py→ orchestrator → runner on the box; the
collected artifact is the per-stem NPZ that the analysis pipeline consumes. - Cluster shape (SSH aliases, repo paths, Python envs, weights, env) lives in
analysis/cluster.yaml. - Dated log dirs come from infra (
infra.LOGS_DIR,infra.today()).
State vocabulary — two StrEnums, distinct namespaces
PollKind(the wire vocabulary, UPPERCASE values) is defined in detached_run — its
producer:classify()emits it andstatus()serialises it onto the SSH status line.dispatch
importsdr.PollKind;parse_status_linereturns it andapply_statusconsumes it.State(the controller lifecycle, lowercaseauto()values) lives here and is what the
journal serialises. Back-compat module aliases (dispatch.PENDING = State.PENDING, …) keep the old
dsp.RUNNINGcall-sites — and tolerance, a non-test importer — working unchanged.- The two share member names (RUNNING/FINISHED/FAILED/VANISHED) but never collide:
State.FINISHED
("finished") ≠PollKind.FINISHED("FINISHED").RUNNING_KINDSMUST staydr.PollKind.-qualified.
Inputs / Outputs
- In: a manifest file (one stem per line) under
analysis/manifests/+cluster.yaml; CLI verb
(run/sync/status/watch/collect/resume). Manifests are ephemeral inputs
(analysis/manifests/*_manifest.txtis gitignored — seeanalysis/manifests/README.md); the durable
record of what ran is the campaign dir’s archived manifest copy +run_provenance.json+
dispatch_journal.json. - Out — the campaign dir (
runderives<campaign>= manifest filename minus_manifest.txt, or
--name): every artifact for one batch nests underlogs/logs_<date>/<campaign>/— the archived
manifest,dispatch_journal.json, and each collected run at<campaign>/<stem>/(with a
run_provenance.jsonsidecar). The box side stays flat (logs/logs_<date>/<stem>/); only the
controller nests on collect. The analysis pipeline reads a nested run via an explicit
Orchestrator(stem, run_dir=<campaign>/<stem>)—run_spec.py <stem>no longer auto-finds it.
Key methods / functions
State— controller lifecycle StrEnum (lowercase; journal-serialised) + back-compat aliases —analysis/dispatch.py:195load_cluster— parse + schema-validatecluster.yamlinto typedHosts —analysis/dispatch.py:63partition— weighted deterministic static split of stems across hosts —analysis/dispatch.py:119apply_status— fold one poll (dr.PollKind) into aStemRecord; the transition function —analysis/dispatch.py:247preflight_host— probe tmux/import/sha/dirty/day; gate a box in or out —analysis/dispatch.py:415sync_host/sync_cluster— advance a clean box to controller HEAD via a thin git bundle —analysis/dispatch.py:498/:530expected_npz— lone npz by basename; None if absent OR >1 DISTINCT basename (fail-closed) —analysis/dispatch.py:629collect_run— atomic merge-rsync + npz-verify staging into<campaign>/_staging(one flat dir; second rsync skipped when sources coincide) —analysis/dispatch.py:655reconcile— land a staged run into<campaign>/<stem>(force=Trueoverwrites, else keep-both) —analysis/dispatch.py:727load_journal— rebuild records; normalise old UPPERCASE state on read; strip_campaign—analysis/dispatch.py:767campaign_dir_of— locate the campaign dir (explicit--campaign, else newest; raise on ambiguous) —analysis/dispatch.py:787dispatch_loop— assign → poll/route → re-launch until every stem is terminal —analysis/dispatch.py:916invoke_detached— the one transport seam: list-form SSH or in-processdr.<verb>—analysis/dispatch.py:318cmd_run— the headline verb (validate manifest → build campaign → sync → preflight → loop) —analysis/dispatch.py:1114cmd_collect/cmd_resume— locate the campaign, then collect FINISHED stems / re-poll non-terminal —analysis/dispatch.py:1257/:1288
Footguns
Every transport call is list-form, never a shell string
All SSH/rsync go through one injectable seam in subprocess list-form — never
shell=True— so no
compound-shell metacharacter ever reaches a local Bash call (anti-spam-hook clean). This is why
invoke_detachedbuilds a quotedremote_cmdstring forrun_sshrather than splatting a shell.
(analysis/INSIGHTS.mddispatch [io])
Never SSH to
host: local
is_local(host == ‘local’) routes to in-processdr.<verb>with stdout captured; the local box is
also preflighted by a local subprocess. SSH-ing to “local” doesn’t resolve and would spuriously
drop the controller from its own cluster. (analysis/INSIGHTS.mddispatch [run])
Collection is fail-closed; never flip COLLECTED on an empty pull
A
run_dirparse miss, an rsync error, or an absent npz leaves the record non-terminal
(retryable).collect_runis atomic — every rsync rc==0 and npz present beforeos.replace;
any failure removes the partial temp tree. (analysis/INSIGHTS.mddispatch [io]/[run])
One flat collected dir, not
run/+stem/(Jun24 dedup)
collect_runstages into a single flat dir — npz, figures,console.log,exit_statusside by
side. It rsyncs the npz source (run_dir_remote) then, only if the sentinel dir differs
(stem_src != run_src), mergesLOGS_DIR/<stem>/. The two coincide wheneverrun.name == stem
(the common case) → a single rsync. The old design always stagedrun/andstem/, which for
run.name == stemwere byte-identical → ~2× log disk + a wasted rsync. Downstream is unaffected: the
analysis pipeline loads an explicit npz path, never the collected dir’s internal shape.
Transport error ≠ application state
TRANSPORT_ERRORis not anapply_statuskind: a failed transport is a box-down event handled
upstream (downs the box after 2 consecutive errors, re-pools its stems to PENDING without charging
any attempt/failed budget). VANISHED is only re-pooled aftercorroboration_n(3) consecutive polls,
and is ignored entirely for a stem that was never alive. (analysis/INSIGHTS.mddispatch [run])
Sync resets only CLEAN boxes; divergent boxes need a human
The Linux boxes have no GitHub creds, so sync ships a thin
box_sha..branchbundle then
fetch + reset --hard— but a surprise-dirty box is skipped and reported, never clobbered, and a
divergent box (from_shanot in controller history) fails the bundle withNone.cmd_runsyncs
before preflight so a stale box is advanced rather than DROPped for sha-mismatch.
(analysis/INSIGHTS.mddispatch [run])
remote_dated_dirresolution order (idempotent re-collect)
_collect_oneresolves the remotelogs_<date>in priority: (1) the launch line captured at
dispatch; (2) the preflight-pinned day (a stem already RUNNING when this controller started); (3) the
console’s ownrun_dir:line, which back-fills the record so re-collect survives controller restarts.
(analysis/INSIGHTS.mddispatch [run])
State case changed — old journals are normalised on read (SER-2)
Stateis now lowercaseauto(), so the journal stores"running", not"RUNNING". A pre-refactor
(UPPERCASE) journal would otherwise mis-compare against the new members and silently re-dispatch
terminal stems;load_journalcoerces every state throughState(value.lower())to prevent this.
The journal also carries a top-level_campaignid (stripped on load, never aStemRecord).
collect/status/resumelocate the campaign, not wall-clock today (date-rollover fix)A run dispatched before midnight writes its journal under the dispatch day’s campaign dir.
campaign_dir_offinds it by the newestlogs_*/<campaign>/dispatch_journal.json(or explicit
--campaign/--name) — so collection works after the date rolls over. With >1 candidate and no
--name/--campaignit raises rather than silently picking the wrong campaign.
--force/--rerun: re-running a stem under its old nameWithout
--force, re-dispatching an already-collected stem the same day re-collects the stale box
log (the pre-poll sees the old sentinel;reconcilekeep-both refuses to overwrite the canonical).
--forceskips the pre-poll short-circuit AND makesreconcileoverwrite. Pass it to bothrun
and any later manualcollect(the intent is not journaled). (analysis/INSIGHTS.mddispatch)
Pseudocode (cmd_run)
hosts = load_cluster(cluster.yaml)
stems = parse_manifest(manifest) # dedup; raise on empty; --manifest required
campaign_dir = LOGS_DIR/<campaign> # <campaign> = manifest stem (or --name); archive a copy
sync_cluster(hosts) # advance clean boxes to controller HEAD
survivors = [h for h in hosts if preflight_host(h).ok] # tmux/import/sha/dirty/day
partition_map = partition(stems, survivors) # weighted, deterministic, static
dispatch_loop: # opts carry campaign_dir/campaign/force
_dispatch_assigned(...) # pre-poll (resume-skip; SKIPPED when --force) then launch
while any stem non-terminal and polls < max:
_poll_and_route(...) # per box: apply_status -> collect/repool/exhaust
re-launch re-pooled stems (PENDING->box, ASSIGNED->launch)
save_journal(records) # journal every cycle -> drop-proof resume
Related
detached_run · orchestrator · runner · infra · determinism · terminology