v0.17.0 — Production Ready
The Workflow Engine That
Lives Inside Your App
A Python library — not a platform. Embed DAG-based orchestration directly in your code. All 43 academic workflow patterns, event sourcing, crash recovery, and native AI agent integration.
pip install stabilize
Why Embedded
Not Another Platform to Deploy
Stabilize is a library you import. No scheduler process, no web UI, no separate cluster. Your application IS the workflow engine.
| Stabilize | Airflow | Temporal | Prefect | |
|---|---|---|---|---|
| Setup | pip install stabilize |
Webserver + scheduler + DB + Celery workers | Server cluster + worker processes + DB | Cloud account + agent deployment |
| Infrastructure | Zero — it's a library | High — separate cluster | High — separate cluster | Medium — cloud dependency |
| State Storage | Atomic DB+Queue transactions | Database row locks | Event-sourced (server-side) | Cloud-managed |
| Flow Control | 43 WCP Patterns | Rigid DAG | Workflows + activities | Rigid DAG |
| Event Sourcing | Built-in with replay | None | Core architecture | None |
| AI Integration | MCP server + prompt CLI | None | None | None |
Other engines are platforms you deploy alongside your application. Stabilize is your application.
Capabilities
Everything You Need in a Library
Production-grade orchestration without the infrastructure tax.
⮕
Message-Driven DAG
Stages form a directed acyclic graph. Execution progresses via queued messages processed by handlers. Parallel fan-out, join synchronization, and complex multi-level DAGs supported natively.
43
43 Workflow Patterns
Complete implementation of van der Aalst's control-flow patterns. AND/OR joins, discriminator, N-of-M, deferred choice, mutex, milestones, structured loops, multi-instance, and sub-workflows.
◉
Event Sourcing
Every state transition recorded as an immutable event. Replay execution from any point. Time-travel state reconstruction. Build analytics projections from the event stream.
⇆
Signals and Suspend/Resume
External signals pause and resume stage execution. Buffered or transient delivery modes. Human-in-the-loop approvals and inter-workflow communication built in.
↻
Crash Recovery
Automatic workflow recovery on process restart. Message deduplication prevents duplicate side effects. Optimistic locking handles concurrent execution safely.
⮊
Dynamic Routing
TaskResult.jump_to() enables conditional branching, retry loops, and AI-driven flow control within executing workflows. Jump count tracking prevents infinite loops.
⬡
Synthetic Stages
Before/after/onFailure stages injected automatically around any stage. Setup, cleanup, validation, and rollback without polluting your core workflow logic.
⬢
Enterprise Reliability
Transient vs permanent error classification. Exponential backoff with jitter. Dead letter queue. Stateful retries that preserve progress across failures via context_update.
AI-Native
Built for AI Agents
Stabilize is designed from the ground up for programmatic workflow creation. Every workflow is defined in Python code — no YAML, no UI, no drag-and-drop. AI coding agents generate correct Stabilize code on the first try.
- ✓ MCP documentation server with 26 tools for AI-assisted development
- ✓
stabilize promptCLI outputs LLM-optimized docs for context windows - ✓ Dynamic routing lets AI agents make decisions during execution
- ✓ Event sourcing gives agents full observability into execution history
- ✓ Sub-workflows enable recursive, agent-orchestrated pipeline composition
SSE mcp.stabilize.rodmena.ai/sse
TOOL get_started
TOOL workflow_patterns
TOOL event_sourcing
TOOL error_handling
TOOL task_implementation
TOOL search_docs
TOOL ... and 20 more
$ stabilize prompt > context.txt
Pure Python
Define Workflows in Code
AI agents route execution dynamically based on intermediate results.
from stabilize import ( Workflow, StageExecution, TaskExecution, Task, TaskResult, TaskRegistry, SqliteQueue, SqliteWorkflowStore, Orchestrator, QueueProcessor, ) from stabilize.events import configure_event_sourcing, SqliteEventStore class RouteTask(Task): """AI agent decides the next step based on analysis.""" def execute(self, stage): if stage.context.get("sentiment") == "negative": return TaskResult.jump_to("escalate") # Dynamic routing return TaskResult.success() # One-line event sourcing setup db = "./workflows.db" store = SqliteWorkflowStore(f"sqlite:///{db}", create_tables=True) queue = SqliteQueue(f"sqlite:///{db}", table_name="queue") configure_event_sourcing(SqliteEventStore(f"sqlite:///{db}", create_tables=True)) workflow = Workflow.create( application="ai-agent", name="Intelligent Routing", stages=[ StageExecution( ref_id="analyze", type="python", name="Analyze Input", context={"script": "RESULT = analyze(INPUT['data'])"}, tasks=[TaskExecution.create("Run", "python", stage_start=True, stage_end=True)], ), StageExecution( ref_id="route", type="route", name="AI Decision", requisite_stage_ref_ids={"analyze"}, context={}, tasks=[TaskExecution.create("Route", "route", stage_start=True, stage_end=True)], ), StageExecution( ref_id="respond", type="http", name="Send Response", requisite_stage_ref_ids={"route"}, context={"url": "https://api.example.com/respond", "method": "POST"}, tasks=[TaskExecution.create("Send", "http", stage_start=True, stage_end=True)], ), StageExecution( ref_id="escalate", type="http", name="Escalate Issue", context={"url": "https://api.example.com/escalate", "method": "POST"}, tasks=[TaskExecution.create("Alert", "http", stage_start=True, stage_end=True)], ), ], )
Control Flow
All 43 Workflow Patterns
The only engine that implements the complete van der Aalst taxonomy.
WCP-9
Discriminator Join
First upstream to complete fires the downstream stage. The rest are ignored.
StageExecution( ref_id="triage", join_type=JoinType.DISCRIMINATOR, requisite_stage_ref_ids={"check_a", "check_b"}, )
WCP-30
N-of-M Join
Proceed when 3 of 5 reviewers approve. Cancel the remaining.
StageExecution( ref_id="approved", join_type=JoinType.N_OF_M, join_threshold=3, requisite_stage_ref_ids={"r1","r2","r3","r4","r5"}, )
WCP-16
Deferred Choice
Race between branches. First to start wins, others are cancelled.
StageExecution( ref_id="agent_reply", deferred_choice_group="response", ) StageExecution( ref_id="escalate", deferred_choice_group="response", )
WCP-23 / WCP-24
Signals
External signals suspend and resume execution. Buffered delivery supported.
# Task suspends waiting for signal return TaskResult.suspend() # External system sends signal queue.push(SignalStage( stage_id=stage.id, signal_name="approved", signal_data={"user": "alice"}, persistent=True, ))
Task System
Built-in Task Types
Ready-to-use tasks for common operations. Or extend the base class for anything.
ShellTask
Execute commands with timeout, env vars, secrets masking, and output capture
HTTPTask
Requests with auth, retries, file uploads, downloads, and JSON parsing
PythonTask
Run scripts inline or from files with INPUT/RESULT data convention
DockerTask
Containers, image builds, resource limits, GPU support, and volume mounts
SSHTask
Remote command execution via SSH with key auth and timeouts
WaitTask
Configurable timed delays between workflow stages
SubWorkflowTask
Nest workflows within workflows with recursion depth tracking
Custom Tasks
Extend Task or RetryableTask for polling, custom integrations, anything