dispatch — multi-box run-spec stem dispatcher

Purpose

Fans a list of run-spec stems across a 1-Mac + 2-Linux personal cluster: weighted static
partition → SSH/tmux detached launch → polling state machine → atomic collection → journal.
Refactor-extract from validation/legacy/corridor_sweep.py (NOT greenfield); the remote-execution
primitive is detached_run.

Role in the system

  • The cluster front-end: it splits different stems across boxes for throughput (contrast
    tolerance, which replicates the same stem to bound cross-machine disagreement).
  • Each leg shells out to detached_run (run/status/list) — over SSH on the Linux boxes,
    in-process import on the local Mac.
  • A launched stem ultimately runs run_spec.pyorchestratorrunner on the box; the
    collected artifact is the per-stem NPZ that the analysis pipeline consumes.
  • Cluster shape (SSH aliases, repo paths, Python envs, weights, env) lives in analysis/cluster.yaml.
  • Dated log dirs come from infra (infra.LOGS_DIR, infra.today()).

State vocabulary — two StrEnums, distinct namespaces

  • PollKind (the wire vocabulary, UPPERCASE values) is defined in detached_run — its
    producer: classify() emits it and status() serialises it onto the SSH status line. dispatch
    imports dr.PollKind; parse_status_line returns it and apply_status consumes it.
  • State (the controller lifecycle, lowercase auto() values) lives here and is what the
    journal serialises. Back-compat module aliases (dispatch.PENDING = State.PENDING, …) keep the old
    dsp.RUNNING call-sites — and tolerance, a non-test importer — working unchanged.
  • The two share member names (RUNNING/FINISHED/FAILED/VANISHED) but never collide: State.FINISHED
    ("finished") ≠ PollKind.FINISHED ("FINISHED"). RUNNING_KINDS MUST stay dr.PollKind.-qualified.

Inputs / Outputs

  • In: a manifest file (one stem per line) under analysis/manifests/ + cluster.yaml; CLI verb
    (run/sync/status/watch/collect/resume). Manifests are ephemeral inputs
    (analysis/manifests/*_manifest.txt is gitignored — see analysis/manifests/README.md); the durable
    record of what ran is the campaign dir’s archived manifest copy + run_provenance.json +
    dispatch_journal.json.
  • Out — the campaign dir (run derives <campaign> = manifest filename minus _manifest.txt, or
    --name): every artifact for one batch nests under logs/logs_<date>/<campaign>/ — the archived
    manifest, dispatch_journal.json, and each collected run at <campaign>/<stem>/ (with a
    run_provenance.json sidecar). The box side stays flat (logs/logs_<date>/<stem>/); only the
    controller nests on collect. The analysis pipeline reads a nested run via an explicit
    Orchestrator(stem, run_dir=<campaign>/<stem>)run_spec.py <stem> no longer auto-finds it.

Key methods / functions

  • State — controller lifecycle StrEnum (lowercase; journal-serialised) + back-compat aliases — analysis/dispatch.py:195
  • load_cluster — parse + schema-validate cluster.yaml into typed Hosts — analysis/dispatch.py:63
  • partition — weighted deterministic static split of stems across hosts — analysis/dispatch.py:119
  • apply_status — fold one poll (dr.PollKind) into a StemRecord; the transition function — analysis/dispatch.py:247
  • preflight_host — probe tmux/import/sha/dirty/day; gate a box in or out — analysis/dispatch.py:415
  • sync_host / sync_cluster — advance a clean box to controller HEAD via a thin git bundle — analysis/dispatch.py:498 / :530
  • expected_npz — lone npz by basename; None if absent OR >1 DISTINCT basename (fail-closed) — analysis/dispatch.py:629
  • collect_run — atomic merge-rsync + npz-verify staging into <campaign>/_staging (one flat dir; second rsync skipped when sources coincide) — analysis/dispatch.py:655
  • reconcile — land a staged run into <campaign>/<stem> (force=True overwrites, else keep-both) — analysis/dispatch.py:727
  • load_journal — rebuild records; normalise old UPPERCASE state on read; strip _campaignanalysis/dispatch.py:767
  • campaign_dir_of — locate the campaign dir (explicit --campaign, else newest; raise on ambiguous) — analysis/dispatch.py:787
  • dispatch_loop — assign → poll/route → re-launch until every stem is terminal — analysis/dispatch.py:916
  • invoke_detached — the one transport seam: list-form SSH or in-process dr.<verb>analysis/dispatch.py:318
  • cmd_run — the headline verb (validate manifest → build campaign → sync → preflight → loop) — analysis/dispatch.py:1114
  • cmd_collect / cmd_resume — locate the campaign, then collect FINISHED stems / re-poll non-terminal — analysis/dispatch.py:1257 / :1288

Footguns

Every transport call is list-form, never a shell string

All SSH/rsync go through one injectable seam in subprocess list-form — never shell=True — so no
compound-shell metacharacter ever reaches a local Bash call (anti-spam-hook clean). This is why
invoke_detached builds a quoted remote_cmd string for run_ssh rather than splatting a shell.
(analysis/INSIGHTS.md dispatch [io])

Never SSH to host: local

is_local (host == ‘local’) routes to in-process dr.<verb> with stdout captured; the local box is
also preflighted by a local subprocess. SSH-ing to “local” doesn’t resolve and would spuriously
drop the controller from its own cluster. (analysis/INSIGHTS.md dispatch [run])

Collection is fail-closed; never flip COLLECTED on an empty pull

A run_dir parse miss, an rsync error, or an absent npz leaves the record non-terminal
(retryable). collect_run is atomic — every rsync rc==0 and npz present before os.replace;
any failure removes the partial temp tree. (analysis/INSIGHTS.md dispatch [io]/[run])

One flat collected dir, not run/+stem/ (Jun24 dedup)

collect_run stages into a single flat dir — npz, figures, console.log, exit_status side by
side. It rsyncs the npz source (run_dir_remote) then, only if the sentinel dir differs
(stem_src != run_src), merges LOGS_DIR/<stem>/. The two coincide whenever run.name == stem
(the common case) → a single rsync. The old design always staged run/ and stem/, which for
run.name == stem were byte-identical → ~2× log disk + a wasted rsync. Downstream is unaffected: the
analysis pipeline loads an explicit npz path, never the collected dir’s internal shape.

Transport error ≠ application state

TRANSPORT_ERROR is not an apply_status kind: a failed transport is a box-down event handled
upstream (downs the box after 2 consecutive errors, re-pools its stems to PENDING without charging
any attempt/failed budget). VANISHED is only re-pooled after corroboration_n (3) consecutive polls,
and is ignored entirely for a stem that was never alive. (analysis/INSIGHTS.md dispatch [run])

Sync resets only CLEAN boxes; divergent boxes need a human

The Linux boxes have no GitHub creds, so sync ships a thin box_sha..branch bundle then
fetch + reset --hard — but a surprise-dirty box is skipped and reported, never clobbered, and a
divergent box (from_sha not in controller history) fails the bundle with None. cmd_run syncs
before preflight so a stale box is advanced rather than DROPped for sha-mismatch.
(analysis/INSIGHTS.md dispatch [run])

remote_dated_dir resolution order (idempotent re-collect)

_collect_one resolves the remote logs_<date> in priority: (1) the launch line captured at
dispatch; (2) the preflight-pinned day (a stem already RUNNING when this controller started); (3) the
console’s own run_dir: line, which back-fills the record so re-collect survives controller restarts.
(analysis/INSIGHTS.md dispatch [run])

State case changed — old journals are normalised on read (SER-2)

State is now lowercase auto(), so the journal stores "running", not "RUNNING". A pre-refactor
(UPPERCASE) journal would otherwise mis-compare against the new members and silently re-dispatch
terminal stems; load_journal coerces every state through State(value.lower()) to prevent this.
The journal also carries a top-level _campaign id (stripped on load, never a StemRecord).

collect/status/resume locate the campaign, not wall-clock today (date-rollover fix)

A run dispatched before midnight writes its journal under the dispatch day’s campaign dir.
campaign_dir_of finds it by the newest logs_*/<campaign>/dispatch_journal.json (or explicit
--campaign/--name) — so collection works after the date rolls over. With >1 candidate and no
--name/--campaign it raises rather than silently picking the wrong campaign.

--force/--rerun: re-running a stem under its old name

Without --force, re-dispatching an already-collected stem the same day re-collects the stale box
log (the pre-poll sees the old sentinel; reconcile keep-both refuses to overwrite the canonical).
--force skips the pre-poll short-circuit AND makes reconcile overwrite. Pass it to both run
and any later manual collect (the intent is not journaled). (analysis/INSIGHTS.md dispatch)

Pseudocode (cmd_run)

hosts  = load_cluster(cluster.yaml)
stems  = parse_manifest(manifest)            # dedup; raise on empty; --manifest required
campaign_dir = LOGS_DIR/<campaign>           # <campaign> = manifest stem (or --name); archive a copy
sync_cluster(hosts)                          # advance clean boxes to controller HEAD
survivors = [h for h in hosts if preflight_host(h).ok]   # tmux/import/sha/dirty/day
partition_map = partition(stems, survivors)  # weighted, deterministic, static

dispatch_loop:                               # opts carry campaign_dir/campaign/force
  _dispatch_assigned(...)                     # pre-poll (resume-skip; SKIPPED when --force) then launch
  while any stem non-terminal and polls < max:
    _poll_and_route(...)                      # per box: apply_status -> collect/repool/exhaust
    re-launch re-pooled stems (PENDING->box, ASSIGNED->launch)
    save_journal(records)                     # journal every cycle -> drop-proof resume

detached_run · orchestrator · runner · infra · determinism · terminology