PetalFlow is an open-source Go workflow runtime for building AI agent systems as explicit, testable graphs. It helps you move from prompt experiments to production workflows with clear execution, reusable tools, webhook support, scheduling, event streams, and an HTTP daemon API.
Use PetalFlow when you want your AI workflows to behave like software systems, not black boxes.
- Build workflows as graphs with explicit nodes and edges.
- Combine LLM steps, tool calls, routing, transforms, human gates, and webhooks.
- Run workflows from Go code, the CLI, or an HTTP daemon.
- Persist workflows, schedules, tools, and events in SQLite.
- Stream and inspect runtime events for debugging and observability.
- Export traces and metrics with OpenTelemetry.
go get github.com/petal-labs/petalflowgo install github.com/petal-labs/petalflow/cmd/petalflow@latestpetalflow run examples/06_cli_workflow/greeting.graph.json \
--input '{"name":"World"}'This executes a simple Graph IR workflow and prints the output envelope.
petalflow validate examples/06_cli_workflow/research.agent.yaml
petalflow compile examples/06_cli_workflow/research.agent.yaml --output /tmp/research.graph.jsonexport PETALFLOW_PROVIDER_ANTHROPIC_API_KEY=sk-ant-...
petalflow run examples/06_cli_workflow/research.agent.yaml \
--input '{"topic":"Go concurrency patterns"}'- Customer support triage: classify inbound tickets, route by urgency, auto-draft replies.
- Research and writing pipelines: gather information, summarize findings, draft final output.
- Tool-driven automation: call internal APIs, databases, and MCP tools as workflow steps.
- Human-in-the-loop approvals: pause at critical steps for explicit review.
- Webhook automations: receive inbound events (
webhook_trigger) and send outbound notifications (webhook_call). - Scheduled workflows: run recurring jobs via cron in daemon mode.
package main
import (
"context"
"fmt"
"github.com/petal-labs/petalflow"
)
func main() {
g := petalflow.NewGraph("hello")
greet := petalflow.NewFuncNode("greet", func(ctx context.Context, env *petalflow.Envelope) (*petalflow.Envelope, error) {
name := env.GetVarString("name")
env.SetVar("greeting", fmt.Sprintf("Hello, %s!", name))
return env, nil
})
g.AddNode(greet)
g.SetEntry("greet")
env := petalflow.NewEnvelope().WithVar("name", "World")
rt := petalflow.NewRuntime()
result, err := rt.Run(context.Background(), g, env, petalflow.DefaultRunOptions())
if err != nil {
panic(err)
}
fmt.Println(result.GetVarString("greeting"))
}PetalFlow CLI supports two workflow formats:
Agent/Task(YAML/JSON): high-level authoring format.Graph IR(JSON): low-level runtime graph format.
# Validate a workflow file
petalflow validate workflow.yaml
# Compile Agent/Task to Graph IR
petalflow compile workflow.yaml --output compiled.graph.json
# Run either Agent/Task or Graph IR
petalflow run workflow.yaml --input '{"topic":"AI agents"}'
# Start daemon API
petalflow serve --host 0.0.0.0 --port 8080Provider resolution order:
--provider-keyflags- Environment variables
~/.petalflow/config.json(orPETALFLOW_CONFIG)
Examples:
export PETALFLOW_PROVIDER_OPENAI_API_KEY=sk-...
export PETALFLOW_PROVIDER_ANTHROPIC_API_KEY=sk-ant-...
petalflow run workflow.yaml \
--provider-key openai=sk-... \
--input '{"topic":"Release notes"}'Think of Agent/Task as a project plan for AI work:
agent= who does the work (role + model + provider)task= what work gets doneexecution= in what order tasks run
- Research brief: One agent researches a topic, another writes a polished summary.
- Incident response: One agent classifies severity, another drafts a mitigation plan.
- Content operations: One agent outlines, another edits for tone and style.
version: "1.0"
schema_version: "1.0.0"
kind: agent_workflow
id: research_workflow
name: Research Assistant
agents:
researcher:
role: Research Analyst
goal: Gather useful facts about a topic
provider: anthropic
model: claude-sonnet-4-6
writer:
role: Technical Writer
goal: Turn findings into a concise report
provider: anthropic
model: claude-sonnet-4-6
tasks:
research:
description: Research {{input.topic}} and summarize key points.
agent: researcher
expected_output: Structured notes
write_report:
description: Write a short report from {{tasks.research.output}}.
agent: writer
expected_output: Final report
execution:
strategy: sequential
task_order:
- research
- write_reportschema_version uses semantic versioning (MAJOR.MINOR.PATCH). Current supported major is 1.
Legacy workflows without schema_version continue to load during the transition window for schema major 1; they are planned to be rejected when schema major 2 is introduced.
Versioned JSON schema artifacts for editor/plugin tooling live in schemas/agent-workflow/v1.json and schemas/graph-workflow/v1.json.
Start daemon mode:
petalflow serve --host 0.0.0.0 --port 8080Common endpoints:
POST /api/workflows/agentcreate workflow from Agent/TaskPOST /api/workflows/graphcreate workflow from Graph IRPOST /api/workflows/{id}/runrun a workflowGET /api/workflows/{id}/scheduleslist cron schedulesPOST /api/workflows/{id}/schedulescreate cron scheduleGET /api/runs/{run_id}/eventsfetch persisted run events
See full API docs: docs/daemon-api.md
PetalFlow emits structured runtime events like:
run.started,run.finishednode.started,node.finished,node.failedtool.call,tool.resultroute.decision
- CLI:
petalflow run --streamstreams node output events. - Daemon: run events are persisted and available at
GET /api/runs/{run_id}/events.
package main
import (
"context"
"github.com/petal-labs/petalflow"
petalotel "github.com/petal-labs/petalflow/otel"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func runWithTelemetry(ctx context.Context, g petalflow.Graph, env *petalflow.Envelope) error {
tracerProvider := sdktrace.NewTracerProvider()
meterProvider := sdkmetric.NewMeterProvider()
tracing := petalotel.NewTracingHandler(tracerProvider.Tracer("petalflow"))
metrics, err := petalotel.NewMetricsHandler(meterProvider.Meter("petalflow"))
if err != nil {
return err
}
opts := petalflow.DefaultRunOptions()
opts.EventHandler = petalflow.MultiEventHandler(tracing.Handle, metrics.Handle)
opts.EventEmitterDecorator = func(emit petalflow.EventEmitter) petalflow.EventEmitter {
return petalotel.EnrichEmitter(emit, tracing)
}
_, err = petalflow.NewRuntime().Run(ctx, g, env, opts)
return err
}PetalFlow supports both directions of webhook automation:
webhook_trigger: start a workflow from an inbound HTTP webhookwebhook_call: send outbound HTTP webhook requests from a workflow
See full walk-through: examples/08_webhooks
PetalFlow includes a tool registry and MCP integration for attaching external capabilities to workflows.
- CLI guide:
docs/tools-cli.md - MCP overlays:
docs/mcp-overlay.md
examples/01_hello_worldexamples/02_iris_integrationexamples/03_sentiment_routerexamples/04_data_pipelineexamples/05_rag_workflowexamples/06_cli_workflowexamples/07_mcp_overlayexamples/08_webhooks
# Root module tests
go test ./... -count=1
# Integration tests (requires provider key)
export OPENAI_API_KEY=sk-...
go test -tags=integration ./tests/integration/... -count=1 -vRepo docs live in docs/.
MIT. See LICENSE.