When an agent finishes a task, what happens next? This repo makes the answer explicit, traceable, and safe.
Multi-agent systems fail at coordination, not capability:
| Gap | What Goes Wrong |
|---|---|
| No explicit handoff | Agent A finishes. Nobody tells Agent B. Work stalls silently. |
| No fan-in | 5 parallel tasks return mixed results. Proceed or stop? By what rule? |
| No state continuity | Process crashes. Where were we? What was done? How to resume? |
| No safety gate | Auto-dispatch without guardrails → runaway agents, wasted compute. |
An orchestration control plane with two co-existing execution paths:
| Path | Use Case | Entry |
|---|---|---|
| DAG Workflow | Pre-planned batch orchestration: plan all batches → auto-advance | cli.py plan / run / resume / show |
| Callback-Driven | Event-driven: message → callback → decision → next dispatch | cli.py status / decide / stuck |
Both paths share the same execution substrate (SubagentExecutor + TmuxTaskExecutor).
| Capability | How It Works | Status |
|---|---|---|
| Batch DAG Planning | depends_on defines dependencies. Kahn's algorithm validates DAG, topological sort determines execution order. |
Production |
| Parallel Dispatch + Retry | BatchExecutor dispatches via pluggable executors, slot-based fill (MAX_SESSIONS cap), retries failed tasks with budget. |
Production |
| Fan-in Review | BatchReviewer applies all_success / any_success / majority policy. |
Production |
| Safety Gates | Configurable gate conditions pause workflow for human review. | Production |
| Single JSON Truth | One workflow_state_*.json per workflow — all batches, tasks, decisions. |
Production |
| LangGraph Integration | Optional LangGraph StateGraph engine. Falls back to zero-dependency polling loop. | Production |
| Continuation Kernel | 9-version artifact chain: registration → dispatch → spawn → execute → receipt → callback → auto-continue. |
Production |
| Hooks | Promise anchor verification + completion translation enforcement. Configurable: audit / warn / enforce. | Production |
| Observability | Per-task status cards, dashboard rendering, tmux session sync. | Production |
| Alerts | Rule-based alerting with DingTalk webhook delivery, file audit trail, dedup + throttling. | Production |
| Unified Status | all-status command shows DAG workflows + callback tasks + tmux sessions + observability cards in one view. |
Production |
| Single Task Retry | retry-task resets a specific failed task without replaying the entire workflow. |
Production |
| Pluggable Executors | TaskExecutorBase abstract interface — 4 methods to implement a new backend. |
Production |
| Circuit Breaker | Tracks consecutive (3) and total (20) failures per dispatch target; auto-trip. | Production |
| Auto-Continue Rate Limit | Hourly (20) and daily (100) caps prevent runaway continuation loops. | Production |
pip install langgraph langgraph-checkpoint-sqlite # optional
# 1. Plan — validate DAG, create state file
python3 runtime/orchestrator/cli.py plan "Analyze codebase" config.json
# 2. Run — execute batches
python3 runtime/orchestrator/cli.py run workflow_state_wf_xxx.json --workspace /path/to/project
# 3. Monitor — check progress
python3 runtime/orchestrator/cli.py show workflow_state_wf_xxx.json
# 4. Unified status — cross-path view
python3 runtime/orchestrator/cli.py all-status
# 5. Resume — continue from gate or crash
python3 runtime/orchestrator/cli.py resume workflow_state_wf_xxx.json
# 6. Retry a single failed task
python3 runtime/orchestrator/cli.py retry-task workflow_state_wf_xxx.json t1[
{
"batch_id": "collect",
"label": "Data Collection",
"tasks": [
{"task_id": "t1", "label": "Source A", "max_retries": 2},
{"task_id": "t2", "label": "Source B"}
],
"depends_on": [],
"fan_in_policy": "any_success"
},
{
"batch_id": "synthesize",
"label": "Merge Results",
"tasks": [{"task_id": "t3", "label": "Synthesize"}],
"depends_on": ["collect"]
}
] CLI (cli.py)
/ \
DAG Workflow Callback-Driven
(plan/run/resume/show) (status/decide/stuck)
│ │
TaskPlanner Orchestrator
(Kahn topo sort) (rule chain decisions)
│ │
WorkflowState StateMachine
(single JSON file) (per-task JSON files)
│ │
WorkflowLoop BatchAggregator
(main loop) (stuck detection)
│
BatchExecutor ───→ BatchReviewer
(slot dispatch) (fan-in: all/any/majority)
│
TaskExecutorBase ← Strategy Pattern
/ \
TmuxTaskExecutor SubagentExecutor
(tmux sessions) (subprocess fork)
Creates tmux sessions with Claude Code, manages lifecycle:
start-tmux-task.sh --label <name> --workdir <dir> --task <prompt> \
[--type <type>] [--model <model>] [--auto-exit] [--no-ralph] [--no-worktree]| Feature | Detail |
|---|---|
| Input validation | LABEL validated against ^[a-zA-Z0-9._-]+$ |
| Concurrency lock | mkdir-based atomic lock, 60s stale recovery |
| Results dedup | Skips already-completed tasks |
| Worktree isolation | Auto-creates git worktree (base branch configurable via OPENCLAW_WORKTREE_BASE) |
| Trace ID | Per-dispatch uuid for end-to-end tracing |
| State file | jq for safe JSON escaping |
class MyExecutor(TaskExecutorBase):
def execute(self, task_id, label, context) -> str: ... # returns handle
def poll(self, handle) -> TaskResult: ... # check status
def cancel(self, handle) -> bool: ... # cancel task
def cleanup(self, handle) -> None: ... # clean upBuilt-in: TmuxTaskExecutor (tmux sessions with 5-stage completion detection, double-exit prevention) and SubagentExecutor (subprocess with 3-layer fork bomb guard).
| Mechanism | Implementation |
|---|---|
| Atomic Writes | tempfile + os.fsync + os.replace throughout |
| File-Level Locking | fcntl.flock on state_machine, workflow_state_store, single_writer_guard, bridge_consumer |
| Input Validation | Regex gate on LABEL, task_id, batch_id — prevents shell injection and path traversal |
| Circuit Breaker | Consecutive (3) and total (20) failure tracking per target |
| Watchdog | Cron every 5 min: stall detection (3600s), auto-resume (max 3 attempts), shared-context cleanup (8+ directory TTLs) |
| Fork Bomb Prevention | Three-layer guard: spawn depth + pgrep count + semaphore |
| Auto-Continue Rate Limit | 20/hour + 100/day caps prevent runaway continuation loops |
| UTC Timestamps | All timeout/comparison code uses datetime.now(timezone.utc) |
| Crash Recovery | workflow_loop persists state before exit on unhandled exceptions |
DingTalk webhook delivery with HMAC-SHA256 signing:
from alert_dispatcher import AlertDispatcher
dispatcher = AlertDispatcher() # defaults to dingtalk channel
dispatcher.dispatch_completion_alert(receipt, task_context)
dispatcher.dispatch_timeout_alert(card, timeout_minutes=15)
dispatcher.dispatch_failure_alert(receipt, task_context)- Deduplication via SHA256 alert_id + 5-minute throttle window
- Falls back to file-based audit on delivery failure
- Channel configurable via
OPENCLAW_ALERT_CHANNELenv var
Two behavioral enforcement hooks, configurable via OPENCLAW_HOOK_ENFORCE_MODE:
| Hook | Purpose | Integration Point |
|---|---|---|
| PostPromiseVerifyHook | Verifies dispatch has a real execution anchor (dispatch_id, session_id). | auto_dispatch.py — blocks dispatch in enforce mode |
| PostCompletionTranslateHook | Requires completion reports to have Conclusion + Evidence + Action. | completion_receipt.py — logs warning in enforce mode |
Modes: audit (log only, default) → warn (log + metadata) → enforce (block operation).
| Variable | Default | Description |
|---|---|---|
OPENCLAW_MAX_TMUX_SESSIONS |
6 | Max concurrent tmux sessions |
OPENCLAW_SESSION_PREFIX |
oc | Tmux session name prefix |
OPENCLAW_DEFAULT_BACKEND |
auto | Execution backend (tmux/subagent/auto) |
OPENCLAW_WORKTREE_BASE |
origin/develop | Git worktree base branch |
OPENCLAW_CODING_TYPES |
bugfix feat crash comp fix | Task types requiring worktree isolation |
OPENCLAW_HOME |
~/.openclaw | Root directory (shared by Python and shell) |
OPENCLAW_STALL_TIMEOUT |
3600 | Stall detection timeout (seconds) |
OPENCLAW_HOOK_ENFORCE_MODE |
audit | Hook mode: audit / warn / enforce |
OPENCLAW_ALERT_CHANNEL |
dingtalk | Alert delivery: dingtalk / file |
OPENCLAW_MAX_CONTINUE_PER_HOUR |
20 | Auto-continue hourly rate limit |
OPENCLAW_MAX_CONTINUE_PER_DAY |
100 | Auto-continue daily rate limit |
OPENCLAW_PYTHON3 |
/opt/homebrew/bin/python3 | Python3 path for cron environment |
PYTHONPATH=runtime/orchestrator python3 -m pytest tests/ -v -k "not e2e"88 unit tests + 6 e2e tests (e2e requires tmux + Claude Code).
MIT