Stabilize — Embedded Workflow Orchestration for AI Agents

5 min read Original article ↗

v0.17.0 — Production Ready

The Workflow Engine That
Lives Inside Your App

A Python library — not a platform. Embed DAG-based orchestration directly in your code. All 43 academic workflow patterns, event sourcing, crash recovery, and native AI agent integration.

pip install stabilize

Why Embedded

Not Another Platform to Deploy

Stabilize is a library you import. No scheduler process, no web UI, no separate cluster. Your application IS the workflow engine.

Stabilize Airflow Temporal Prefect
Setup pip install stabilize Webserver + scheduler + DB + Celery workers Server cluster + worker processes + DB Cloud account + agent deployment
Infrastructure Zero — it's a library High — separate cluster High — separate cluster Medium — cloud dependency
State Storage Atomic DB+Queue transactions Database row locks Event-sourced (server-side) Cloud-managed
Flow Control 43 WCP Patterns Rigid DAG Workflows + activities Rigid DAG
Event Sourcing Built-in with replay None Core architecture None
AI Integration MCP server + prompt CLI None None None

Other engines are platforms you deploy alongside your application. Stabilize is your application.

Capabilities

Everything You Need in a Library

Production-grade orchestration without the infrastructure tax.

Message-Driven DAG

Stages form a directed acyclic graph. Execution progresses via queued messages processed by handlers. Parallel fan-out, join synchronization, and complex multi-level DAGs supported natively.

43

43 Workflow Patterns

Complete implementation of van der Aalst's control-flow patterns. AND/OR joins, discriminator, N-of-M, deferred choice, mutex, milestones, structured loops, multi-instance, and sub-workflows.

Event Sourcing

Every state transition recorded as an immutable event. Replay execution from any point. Time-travel state reconstruction. Build analytics projections from the event stream.

Signals and Suspend/Resume

External signals pause and resume stage execution. Buffered or transient delivery modes. Human-in-the-loop approvals and inter-workflow communication built in.

Crash Recovery

Automatic workflow recovery on process restart. Message deduplication prevents duplicate side effects. Optimistic locking handles concurrent execution safely.

Dynamic Routing

TaskResult.jump_to() enables conditional branching, retry loops, and AI-driven flow control within executing workflows. Jump count tracking prevents infinite loops.

Synthetic Stages

Before/after/onFailure stages injected automatically around any stage. Setup, cleanup, validation, and rollback without polluting your core workflow logic.

Enterprise Reliability

Transient vs permanent error classification. Exponential backoff with jitter. Dead letter queue. Stateful retries that preserve progress across failures via context_update.

AI-Native

Built for AI Agents

Stabilize is designed from the ground up for programmatic workflow creation. Every workflow is defined in Python code — no YAML, no UI, no drag-and-drop. AI coding agents generate correct Stabilize code on the first try.

  • MCP documentation server with 26 tools for AI-assisted development
  • stabilize prompt CLI outputs LLM-optimized docs for context windows
  • Dynamic routing lets AI agents make decisions during execution
  • Event sourcing gives agents full observability into execution history
  • Sub-workflows enable recursive, agent-orchestrated pipeline composition

Connect MCP Server

SSE mcp.stabilize.rodmena.ai/sse

TOOL get_started

TOOL workflow_patterns

TOOL event_sourcing

TOOL error_handling

TOOL task_implementation

TOOL search_docs

TOOL ... and 20 more

$ stabilize prompt > context.txt

Pure Python

Define Workflows in Code

AI agents route execution dynamically based on intermediate results.

from stabilize import (
    Workflow, StageExecution, TaskExecution,
    Task, TaskResult, TaskRegistry,
    SqliteQueue, SqliteWorkflowStore,
    Orchestrator, QueueProcessor,
)
from stabilize.events import configure_event_sourcing, SqliteEventStore

class RouteTask(Task):
    """AI agent decides the next step based on analysis."""
    def execute(self, stage):
        if stage.context.get("sentiment") == "negative":
            return TaskResult.jump_to("escalate")  # Dynamic routing
        return TaskResult.success()

# One-line event sourcing setup
db = "./workflows.db"
store = SqliteWorkflowStore(f"sqlite:///{db}", create_tables=True)
queue = SqliteQueue(f"sqlite:///{db}", table_name="queue")
configure_event_sourcing(SqliteEventStore(f"sqlite:///{db}", create_tables=True))

workflow = Workflow.create(
    application="ai-agent",
    name="Intelligent Routing",
    stages=[
        StageExecution(
            ref_id="analyze", type="python", name="Analyze Input",
            context={"script": "RESULT = analyze(INPUT['data'])"},
            tasks=[TaskExecution.create("Run", "python",
                   stage_start=True, stage_end=True)],
        ),
        StageExecution(
            ref_id="route", type="route", name="AI Decision",
            requisite_stage_ref_ids={"analyze"},
            context={},
            tasks=[TaskExecution.create("Route", "route",
                   stage_start=True, stage_end=True)],
        ),
        StageExecution(
            ref_id="respond", type="http", name="Send Response",
            requisite_stage_ref_ids={"route"},
            context={"url": "https://api.example.com/respond",
                     "method": "POST"},
            tasks=[TaskExecution.create("Send", "http",
                   stage_start=True, stage_end=True)],
        ),
        StageExecution(
            ref_id="escalate", type="http", name="Escalate Issue",
            context={"url": "https://api.example.com/escalate",
                     "method": "POST"},
            tasks=[TaskExecution.create("Alert", "http",
                   stage_start=True, stage_end=True)],
        ),
    ],
)

Control Flow

All 43 Workflow Patterns

The only engine that implements the complete van der Aalst taxonomy.

WCP-9

Discriminator Join

First upstream to complete fires the downstream stage. The rest are ignored.

StageExecution(
    ref_id="triage",
    join_type=JoinType.DISCRIMINATOR,
    requisite_stage_ref_ids={"check_a", "check_b"},
)

WCP-30

N-of-M Join

Proceed when 3 of 5 reviewers approve. Cancel the remaining.

StageExecution(
    ref_id="approved",
    join_type=JoinType.N_OF_M,
    join_threshold=3,
    requisite_stage_ref_ids={"r1","r2","r3","r4","r5"},
)

WCP-16

Deferred Choice

Race between branches. First to start wins, others are cancelled.

StageExecution(
    ref_id="agent_reply",
    deferred_choice_group="response",
)
StageExecution(
    ref_id="escalate",
    deferred_choice_group="response",
)

WCP-23 / WCP-24

Signals

External signals suspend and resume execution. Buffered delivery supported.

# Task suspends waiting for signal
return TaskResult.suspend()

# External system sends signal
queue.push(SignalStage(
    stage_id=stage.id,
    signal_name="approved",
    signal_data={"user": "alice"},
    persistent=True,
))

Task System

Built-in Task Types

Ready-to-use tasks for common operations. Or extend the base class for anything.

ShellTask

Execute commands with timeout, env vars, secrets masking, and output capture

HTTPTask

Requests with auth, retries, file uploads, downloads, and JSON parsing

PythonTask

Run scripts inline or from files with INPUT/RESULT data convention

DockerTask

Containers, image builds, resource limits, GPU support, and volume mounts

SSHTask

Remote command execution via SSH with key auth and timeouts

WaitTask

Configurable timed delays between workflow stages

SubWorkflowTask

Nest workflows within workflows with recursion depth tracking

Custom Tasks

Extend Task or RetryableTask for polling, custom integrations, anything