Skip to content

lanyasheng/openclaw-company-orchestration-proposal

Repository files navigation

OpenClaw Orchestration Control Plane

When an agent finishes a task, what happens next? This repo makes the answer explicit, traceable, and safe.

中文版 · Operations Guide


The Problem

Multi-agent systems fail at coordination, not capability:

Gap What Goes Wrong
No explicit handoff Agent A finishes. Nobody tells Agent B. Work stalls silently.
No fan-in 5 parallel tasks return mixed results. Proceed or stop? By what rule?
No state continuity Process crashes. Where were we? What was done? How to resume?
No safety gate Auto-dispatch without guardrails → runaway agents, wasted compute.

What This Repo Provides

An orchestration control plane with two co-existing execution paths:

Path Use Case Entry
DAG Workflow Pre-planned batch orchestration: plan all batches → auto-advance cli.py plan / run / resume / show
Callback-Driven Event-driven: message → callback → decision → next dispatch cli.py status / decide / stuck

Both paths share the same execution substrate (SubagentExecutor + TmuxTaskExecutor).

Core Capabilities

Capability How It Works Status
Batch DAG Planning depends_on defines dependencies. Kahn's algorithm validates DAG, topological sort determines execution order. Production
Parallel Dispatch + Retry BatchExecutor dispatches via pluggable executors, slot-based fill (MAX_SESSIONS cap), retries failed tasks with budget. Production
Fan-in Review BatchReviewer applies all_success / any_success / majority policy. Production
Safety Gates Configurable gate conditions pause workflow for human review. Production
Single JSON Truth One workflow_state_*.json per workflow — all batches, tasks, decisions. Production
LangGraph Integration Optional LangGraph StateGraph engine. Falls back to zero-dependency polling loop. Production
Continuation Kernel 9-version artifact chain: registration → dispatch → spawn → execute → receipt → callback → auto-continue. Production
Hooks Promise anchor verification + completion translation enforcement. Configurable: audit / warn / enforce. Production
Observability Per-task status cards, dashboard rendering, tmux session sync. Production
Alerts Rule-based alerting with DingTalk webhook delivery, file audit trail, dedup + throttling. Production
Unified Status all-status command shows DAG workflows + callback tasks + tmux sessions + observability cards in one view. Production
Single Task Retry retry-task resets a specific failed task without replaying the entire workflow. Production
Pluggable Executors TaskExecutorBase abstract interface — 4 methods to implement a new backend. Production
Circuit Breaker Tracks consecutive (3) and total (20) failures per dispatch target; auto-trip. Production
Auto-Continue Rate Limit Hourly (20) and daily (100) caps prevent runaway continuation loops. Production

Quick Start

pip install langgraph langgraph-checkpoint-sqlite  # optional

# 1. Plan — validate DAG, create state file
python3 runtime/orchestrator/cli.py plan "Analyze codebase" config.json

# 2. Run — execute batches
python3 runtime/orchestrator/cli.py run workflow_state_wf_xxx.json --workspace /path/to/project

# 3. Monitor — check progress
python3 runtime/orchestrator/cli.py show workflow_state_wf_xxx.json

# 4. Unified status — cross-path view
python3 runtime/orchestrator/cli.py all-status

# 5. Resume — continue from gate or crash
python3 runtime/orchestrator/cli.py resume workflow_state_wf_xxx.json

# 6. Retry a single failed task
python3 runtime/orchestrator/cli.py retry-task workflow_state_wf_xxx.json t1

Example config.json

[
  {
    "batch_id": "collect",
    "label": "Data Collection",
    "tasks": [
      {"task_id": "t1", "label": "Source A", "max_retries": 2},
      {"task_id": "t2", "label": "Source B"}
    ],
    "depends_on": [],
    "fan_in_policy": "any_success"
  },
  {
    "batch_id": "synthesize",
    "label": "Merge Results",
    "tasks": [{"task_id": "t3", "label": "Synthesize"}],
    "depends_on": ["collect"]
  }
]

Architecture

                          CLI (cli.py)
                         /             \
            DAG Workflow                 Callback-Driven
           (plan/run/resume/show)       (status/decide/stuck)
                 │                              │
          TaskPlanner                      Orchestrator
          (Kahn topo sort)               (rule chain decisions)
                 │                              │
          WorkflowState                    StateMachine
          (single JSON file)           (per-task JSON files)
                 │                              │
          WorkflowLoop                   BatchAggregator
          (main loop)                    (stuck detection)
                 │
          BatchExecutor ───→ BatchReviewer
          (slot dispatch)     (fan-in: all/any/majority)
                 │
          TaskExecutorBase  ← Strategy Pattern
            /          \
   TmuxTaskExecutor   SubagentExecutor
   (tmux sessions)    (subprocess fork)

Dispatch Engine (start-tmux-task.sh)

Creates tmux sessions with Claude Code, manages lifecycle:

start-tmux-task.sh --label <name> --workdir <dir> --task <prompt> \
  [--type <type>] [--model <model>] [--auto-exit] [--no-ralph] [--no-worktree]
Feature Detail
Input validation LABEL validated against ^[a-zA-Z0-9._-]+$
Concurrency lock mkdir-based atomic lock, 60s stale recovery
Results dedup Skips already-completed tasks
Worktree isolation Auto-creates git worktree (base branch configurable via OPENCLAW_WORKTREE_BASE)
Trace ID Per-dispatch uuid for end-to-end tracing
State file jq for safe JSON escaping

Pluggable Executors

class MyExecutor(TaskExecutorBase):
    def execute(self, task_id, label, context) -> str: ...  # returns handle
    def poll(self, handle) -> TaskResult: ...                # check status
    def cancel(self, handle) -> bool: ...                    # cancel task
    def cleanup(self, handle) -> None: ...                   # clean up

Built-in: TmuxTaskExecutor (tmux sessions with 5-stage completion detection, double-exit prevention) and SubagentExecutor (subprocess with 3-layer fork bomb guard).


Reliability

Mechanism Implementation
Atomic Writes tempfile + os.fsync + os.replace throughout
File-Level Locking fcntl.flock on state_machine, workflow_state_store, single_writer_guard, bridge_consumer
Input Validation Regex gate on LABEL, task_id, batch_id — prevents shell injection and path traversal
Circuit Breaker Consecutive (3) and total (20) failure tracking per target
Watchdog Cron every 5 min: stall detection (3600s), auto-resume (max 3 attempts), shared-context cleanup (8+ directory TTLs)
Fork Bomb Prevention Three-layer guard: spawn depth + pgrep count + semaphore
Auto-Continue Rate Limit 20/hour + 100/day caps prevent runaway continuation loops
UTC Timestamps All timeout/comparison code uses datetime.now(timezone.utc)
Crash Recovery workflow_loop persists state before exit on unhandled exceptions

Alerts

DingTalk webhook delivery with HMAC-SHA256 signing:

from alert_dispatcher import AlertDispatcher

dispatcher = AlertDispatcher()  # defaults to dingtalk channel
dispatcher.dispatch_completion_alert(receipt, task_context)
dispatcher.dispatch_timeout_alert(card, timeout_minutes=15)
dispatcher.dispatch_failure_alert(receipt, task_context)
  • Deduplication via SHA256 alert_id + 5-minute throttle window
  • Falls back to file-based audit on delivery failure
  • Channel configurable via OPENCLAW_ALERT_CHANNEL env var

Hooks

Two behavioral enforcement hooks, configurable via OPENCLAW_HOOK_ENFORCE_MODE:

Hook Purpose Integration Point
PostPromiseVerifyHook Verifies dispatch has a real execution anchor (dispatch_id, session_id). auto_dispatch.py — blocks dispatch in enforce mode
PostCompletionTranslateHook Requires completion reports to have Conclusion + Evidence + Action. completion_receipt.py — logs warning in enforce mode

Modes: audit (log only, default) → warn (log + metadata) → enforce (block operation).


Configuration

Variable Default Description
OPENCLAW_MAX_TMUX_SESSIONS 6 Max concurrent tmux sessions
OPENCLAW_SESSION_PREFIX oc Tmux session name prefix
OPENCLAW_DEFAULT_BACKEND auto Execution backend (tmux/subagent/auto)
OPENCLAW_WORKTREE_BASE origin/develop Git worktree base branch
OPENCLAW_CODING_TYPES bugfix feat crash comp fix Task types requiring worktree isolation
OPENCLAW_HOME ~/.openclaw Root directory (shared by Python and shell)
OPENCLAW_STALL_TIMEOUT 3600 Stall detection timeout (seconds)
OPENCLAW_HOOK_ENFORCE_MODE audit Hook mode: audit / warn / enforce
OPENCLAW_ALERT_CHANNEL dingtalk Alert delivery: dingtalk / file
OPENCLAW_MAX_CONTINUE_PER_HOUR 20 Auto-continue hourly rate limit
OPENCLAW_MAX_CONTINUE_PER_DAY 100 Auto-continue daily rate limit
OPENCLAW_PYTHON3 /opt/homebrew/bin/python3 Python3 path for cron environment

Tests

PYTHONPATH=runtime/orchestrator python3 -m pytest tests/ -v -k "not e2e"

88 unit tests + 6 e2e tests (e2e requires tmux + Claude Code).


License

MIT

About

OpenClaw orchestration and workflow control-plane with subagent-first execution and tmux compatibility.

Topics

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors