Orchestrate multi-step business processes with a fluent, chainable DSL. Saga compensation, step retry with exponential backoff, parallel execution, conditional branching, nested sub-workflows, human-in-the-loop signals with timeout, loop control flow (doUntil/doWhile), forEach iteration, map transforms, schema validation (Zod-compatible), per-execution subscribe, typed observability events, and cleanup/archival — all built on top of bunqueue’s Queue and Worker. No new infrastructure, no external services, no YAML.
validate ──→ reserve stock ──→ charge payment ──→ send confirmation
↑ ↑
compensate: compensate:
release stock refund payment
bunqueue vs Competitors
| bunqueue | Temporal | Inngest | Trigger.dev | |
|---|---|---|---|---|
| Definition | TypeScript DSL | TypeScript + decorators | step.run() wrappers | TypeScript functions |
| Infrastructure | None (embedded SQLite) | PostgreSQL + 7 services | Cloud-only (no self-host) | Redis + PostgreSQL |
| Saga compensation | Built-in | Manual | Manual | Manual |
| Human-in-the-loop | .waitFor() + signal() | Signals API | step.waitForEvent() | Waitpoint tokens |
| Branching | .branch().path() | Code-level if/else | Code-level if/else | Code-level if/else |
| Parallel steps | .parallel() | Promise.all | step.run() in parallel | Manual |
| Step retry | Built-in (exponential backoff) | Built-in | Built-in | Built-in |
| Signal timeout | .waitFor(event, { timeout }) | Workflow.await | step.waitForEvent timeout | Manual |
| Nested workflows | .subWorkflow() | Child workflows | step.invoke() | Manual |
| Observability | Typed event emitter | Temporal UI | Inngest dashboard | Dashboard |
| Loops (doUntil/doWhile) | .doUntil() / .doWhile() | Code-level loops | Manual | Manual |
| forEach | .forEach() with indexed results | Code-level loops | Manual | Manual |
| Map transform | .map() | Code-level | Manual | Manual |
| Schema validation | Duck-typed .parse() (Zod, ArkType) | Manual | Built-in | Manual |
| Per-execution subscribe | engine.subscribe(id, cb) | Manual | Webhook | Manual |
| Cleanup/archival | Built-in SQLite archive | Manual | Auto (cloud) | Manual |
| Self-hosted | Yes (zero-config) | Yes (complex) | No | Yes (complex) |
| Pricing | Free (MIT) | Free self-hosted / Cloud $$ | Free tier, then per-execution | Free tier, then $50/mo+ |
| Setup time | bun add bunqueue | Hours to days | Minutes (cloud) | 30min+ self-hosted |
- Zero infrastructure. Temporal needs PostgreSQL + 7 services. Trigger.dev needs Redis + PostgreSQL. bunqueue needs nothing — SQLite is embedded.
- Saga pattern is first-class. Every competitor requires you to implement compensation manually. bunqueue runs compensate handlers in reverse order automatically.
- TypeScript-native DSL. No decorators (Temporal), no wrapper functions (Inngest). Just
.step().step().branch().step(). - Same process, same codebase. No separate worker infrastructure, no deployment pipeline for workflow definitions. It’s a library, not a platform.
When to use something else
- Multi-region HA with automatic failover — Use Temporal
- Serverless-first with zero ops — Use Inngest
- Already running Redis with BullMQ — Use BullMQ FlowProducer for simple parent-child chains
- Crash recovery with auto-resume — bunqueue does not automatically resume workflows that were
runningwhen the process crashed. If you need guaranteed exactly-once execution across restarts, use Temporal
import { Workflow, Engine } from 'bunqueue/workflow';
// Define a workflow
const orderFlow = new Workflow('order-pipeline')
.step('validate', async (ctx) => {
const { orderId, amount } = ctx.input as { orderId: string; amount: number };
if (amount <= 0) throw new Error('Invalid amount');
return { orderId, validated: true };
})
.step('charge', async (ctx) => {
const { orderId } = ctx.steps['validate'] as { orderId: string };
const { amount } = ctx.input as { amount: number };
const txId = await payments.charge(orderId, amount);
return { transactionId: txId };
}, {
compensate: async () => {
// Runs automatically if a later step fails
await payments.refund();
},
})
.step('confirm', async (ctx) => {
const { transactionId } = ctx.steps['charge'] as { transactionId: string };
await mailer.send('order-confirm', { txId: transactionId });
return { emailSent: true, transactionId };
});
// Create engine and run
const engine = new Engine({ embedded: true });
engine.register(orderFlow);
const run = await engine.start('order-pipeline', {
orderId: 'ORD-1',
amount: 99.99,
});
// Check status
const exec = engine.getExecution(run.id);
console.log(exec.state); // 'running' | 'completed' | 'failed' | 'waiting' | 'compensating'
Steps are the building blocks. Each step receives a context with the workflow input and all previous step results:
const flow = new Workflow('data-pipeline')
.step('extract', async (ctx) => {
const { source } = ctx.input as { source: string };
const rawData = await fetchFromSource(source);
return { records: rawData.length, data: rawData };
})
.step('transform', async (ctx) => {
// Access previous step results via ctx.steps
const { data } = ctx.steps['extract'] as { data: RawRecord[] };
const cleaned = data.filter(r => r.valid).map(normalize);
return { cleaned, dropped: data.length - cleaned.length };
})
.step('load', async (ctx) => {
const { cleaned } = ctx.steps['transform'] as { cleaned: CleanRecord[] };
await db.insertBatch('analytics', cleaned);
// Access original input too
return { loaded: cleaned.length, source: (ctx.input as { source: string }).source };
});
StepContext shape:
| Property | Type | Description |
|---|---|---|
ctx.input | unknown | The input passed to engine.start() |
ctx.steps | Record<string, unknown> | Results from all completed steps (keyed by step name) |
ctx.signals | Record<string, unknown> | Data from received signals (keyed by event name) |
ctx.executionId | string | Unique execution ID |
Every step must return a value (or undefined). The return value becomes available to subsequent steps via ctx.steps['step-name'].
Compensation (Saga Pattern)
When a step fails, compensation handlers run in reverse order for all previously completed steps. This implements the saga pattern — the industry-standard approach for distributed transactions without two-phase commit.
const flow = new Workflow('money-transfer')
.step('debit-source', async (ctx) => {
const { from, amount } = ctx.input as { from: string; to: string; amount: number };
await accounts.debit(from, amount);
return { debited: true, account: from, amount };
}, {
compensate: async (ctx) => {
// Undo: credit back the source account
const { from, amount } = ctx.input as { from: string; to: string; amount: number };
await accounts.credit(from, amount);
console.log('Rolled back: source account credited');
},
})
.step('credit-target', async (ctx) => {
const { to, amount } = ctx.input as { from: string; to: string; amount: number };
await accounts.credit(to, amount);
return { credited: true, account: to, amount };
}, {
compensate: async (ctx) => {
// Undo: debit back the target account
const { to, amount } = ctx.input as { from: string; to: string; amount: number };
await accounts.debit(to, amount);
console.log('Rolled back: target account debited');
},
})
.step('send-receipt', async () => {
throw new Error('Email service down');
// → Compensation runs automatically in reverse:
// 1. credit-target compensate (debit target)
// 2. debit-source compensate (credit source)
});
How it works:
- Steps A, B, C execute in order
- Step C throws an error
- Engine runs compensation for B, then A (reverse order)
- Execution state becomes
'failed'
Compensation is best-effort — if a compensate handler itself throws, the error is logged but the remaining compensations still run.
Route execution to different paths based on runtime conditions:
const flow = new Workflow('support-ticket')
.step('classify', async (ctx) => {
const { message, plan } = ctx.input as { message: string; plan: string };
const sentiment = await analyzeSentiment(message);
const priority = plan === 'enterprise' ? 'high' : sentiment < 0 ? 'medium' : 'low';
return { priority };
})
.branch((ctx) => (ctx.steps['classify'] as { priority: string }).priority)
.path('high', (w) =>
w.step('assign-senior', async (ctx) => {
const agent = await roster.getAvailable('senior');
await slack.notify(agent, 'Urgent ticket assigned');
return { assignedTo: agent.name, sla: '1h' };
})
)
.path('medium', (w) =>
w.step('assign-regular', async (ctx) => {
const agent = await roster.getAvailable('regular');
return { assignedTo: agent.name, sla: '4h' };
})
)
.path('low', (w) =>
w.step('auto-reply', async (ctx) => {
await mailer.sendTemplate('auto-reply', ctx.input);
return { assignedTo: 'bot', sla: '24h' };
})
)
.step('log-ticket', async (ctx) => {
// This step always runs, regardless of which branch was taken
await auditLog.write('ticket-created', { executionId: ctx.executionId });
return { logged: true };
});
Rules:
- The branch function returns a string that matches one of the
.path()names - Only the matching path executes; others are skipped entirely
- Steps after the branch block always run (convergence point)
- Each path can contain multiple steps, nested branches, or
waitForcalls
WaitFor (Human-in-the-Loop)
Pause execution until an external signal arrives. This is how you implement approval gates, manual review steps, or any process that needs human input:
const flow = new Workflow('content-publishing')
.step('draft', async (ctx) => {
const { title, body } = ctx.input as { title: string; body: string };
const draft = await cms.createDraft({ title, body });
await slack.notify('#editorial', `New draft "${title}" ready for review`);
return { draftId: draft.id, previewUrl: draft.previewUrl };
})
.waitFor('editorial-review')
.step('publish-or-reject', async (ctx) => {
const review = ctx.signals['editorial-review'] as {
approved: boolean;
editor: string;
notes?: string;
};
const { draftId } = ctx.steps['draft'] as { draftId: string };
if (!review.approved) {
await cms.reject(draftId, review.notes);
return { status: 'rejected', editor: review.editor };
}
const published = await cms.publish(draftId);
return { status: 'published', url: published.url, editor: review.editor };
});
// Start the workflow
const run = await engine.start('content-publishing', {
title: 'Announcing Workflow Engine',
body: '...',
});
// The execution pauses at 'editorial-review' with state: 'waiting'
// Your app can show a UI, send a Slack button, expose an API endpoint, etc.
// When the editor makes a decision (could be minutes, hours, or days later):
await engine.signal(run.id, 'editorial-review', {
approved: true,
editor: 'alice@company.com',
notes: 'Great article, ship it!',
});
// → Execution resumes from 'publish-or-reject'
Key behaviors:
waitFor('event')transitions the execution tostate: 'waiting'- The execution is persisted to SQLite — it survives process restarts. However, if the process crashes while a workflow is in
runningstate, it will not auto-resume on restart. Onlywaitingworkflows can be resumed viasignal() engine.signal(id, event, payload)stores the payload and resumes execution- The signal data is available in
ctx.signals['event-name'] - You can have multiple
waitForcalls in a single workflow (e.g., multi-stage approvals)
Prevent steps from running indefinitely:
const flow = new Workflow('api-aggregation')
.step('fetch-primary', async () => {
const res = await fetch('https://api.primary.com/data');
return await res.json();
}, { timeout: 5000 }) // 5 second timeout
.step('fetch-secondary', async () => {
const res = await fetch('https://api.secondary.com/data');
return await res.json();
}, { timeout: 10000 }) // 10 second timeout
.step('merge', async (ctx) => {
const primary = ctx.steps['fetch-primary'];
const secondary = ctx.steps['fetch-secondary'];
return { ...primary, ...secondary };
});
If a step exceeds its timeout, it fails with a "timed out" error. If the step has a compensate handler, compensation runs for all previously completed steps.
Step Retry with Backoff
Steps can retry automatically with exponential backoff and jitter:
const flow = new Workflow('resilient-pipeline')
.step('call-api', async () => {
const res = await fetch('https://api.external.com/data');
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return await res.json();
}, {
retry: 5, // Max 5 attempts (default: 3)
timeout: 10000, // 10s per attempt
})
.step('process', async (ctx) => {
const data = ctx.steps['call-api'] as ApiResponse;
return { processed: true };
});
Backoff formula: min(500ms * 2^attempt + jitter, 30s). The first retry waits ~500ms, the second ~1s, the third ~2s, capping at 30 seconds. Jitter prevents thundering herds.
The execution tracks attempt count in exec.steps['step-name'].attempts. If all retries are exhausted, the step fails and compensation runs.
Run multiple steps concurrently with .parallel():
const flow = new Workflow('data-enrichment')
.step('fetch-user', async (ctx) => {
const { userId } = ctx.input as { userId: string };
return await db.users.find(userId);
})
.parallel((w) => w
.step('fetch-orders', async (ctx) => {
const { userId } = ctx.input as { userId: string };
return await db.orders.findByUser(userId);
})
.step('fetch-preferences', async (ctx) => {
const { userId } = ctx.input as { userId: string };
return await db.preferences.get(userId);
})
.step('fetch-activity', async (ctx) => {
const { userId } = ctx.input as { userId: string };
return await analytics.getRecent(userId);
})
)
.step('merge', async (ctx) => {
// All parallel step results are available
const orders = ctx.steps['fetch-orders'];
const prefs = ctx.steps['fetch-preferences'];
const activity = ctx.steps['fetch-activity'];
return { profile: { orders, prefs, activity } };
});
How it works:
- All steps inside
.parallel()run viaPromise.allSettled - Results from each parallel step are saved to
exec.stepslike normal steps - If any parallel step fails, the entire parallel group fails and compensation runs. The error thrown is an
AggregateErrorcontaining all individual failure reasons - Steps after the parallel block wait for all parallel steps to finish
Add a timeout to waitFor so workflows don’t hang indefinitely:
const flow = new Workflow('time-limited-approval')
.step('submit', async (ctx) => {
const { amount } = ctx.input as { amount: number };
await slack.notify('#approvals', `Expense $${amount} needs review`);
return { submitted: true };
})
.waitFor('manager-approval', { timeout: 86400000 }) // 24 hours
.step('process', async (ctx) => {
const decision = ctx.signals['manager-approval'] as { approved: boolean };
return { status: decision.approved ? 'paid' : 'rejected' };
});
If the signal doesn’t arrive within the timeout:
- The execution state becomes
'failed' - The error is stored in
exec.steps['__waitFor:manager-approval'].error - Compensation runs for all previously completed steps
- A
signal:timeoutevent is emitted
Nested Workflows (Sub-Workflows)
Compose workflows by calling child workflows from a parent:
const paymentFlow = new Workflow('payment')
.step('validate-card', async (ctx) => {
const { cardToken, amount } = ctx.input as { cardToken: string; amount: number };
return { valid: true, amount };
})
.step('charge', async (ctx) => {
return { txId: `tx_${Date.now()}` };
}, {
compensate: async () => { await payments.refund(); },
});
const orderFlow = new Workflow('order')
.step('create-order', async (ctx) => {
const { amount } = ctx.input as { amount: number; cardToken: string };
return { orderId: `ORD-${Date.now()}`, total: amount };
})
.subWorkflow('payment', (ctx) => ({
// Map parent context to child input
cardToken: (ctx.input as { cardToken: string }).cardToken,
amount: (ctx.steps['create-order'] as { total: number }).total,
}))
.step('confirm', async (ctx) => {
// Child results available under 'sub:<workflow-name>'
const paymentResult = ctx.steps['sub:payment'] as Record<string, unknown>;
return { confirmed: true, payment: paymentResult };
});
const engine = new Engine({ embedded: true });
engine.register(paymentFlow); // Register child first
engine.register(orderFlow);
await engine.start('order', { amount: 99, cardToken: 'tok_abc' });
Key behaviors:
- The parent workflow pauses while the child executes (polling every 100ms)
- Child workflow results are stored under
ctx.steps['sub:<child-name>'] - If the child fails, the parent fails too (and parent compensation runs)
- The input mapper function receives the parent’s context, allowing you to pass any data from parent steps to the child
- Sub-workflows have a hardcoded 300-second (5-minute) timeout. If the child doesn’t complete within that window, the parent step fails
Observability (Events)
Subscribe to typed workflow events for monitoring, logging, and debugging:
const engine = new Engine({ embedded: true });
// Listen to specific event types
engine.on('workflow:started', (event) => {
console.log(`Workflow ${event.workflowName} started: ${event.executionId}`);
});
engine.on('step:completed', (event) => {
const { stepName } = event as StepEvent;
console.log(`Step ${stepName} completed in ${event.executionId}`);
});
engine.on('workflow:failed', (event) => {
alerting.send(`Workflow ${event.workflowName} failed: ${event.executionId}`);
});
// Listen to ALL events
engine.onAny((event) => {
metrics.increment(`workflow.${event.type}`, {
workflow: event.workflowName,
});
});
// Or pass onEvent in constructor
const engine2 = new Engine({
embedded: true,
onEvent: (event) => logger.info(event),
});
Available event types:
| Event | When |
|---|---|
workflow:started | engine.start() is called |
workflow:completed | All steps finished successfully |
workflow:failed | A step threw after retries exhausted |
workflow:waiting | Execution paused at a waitFor |
workflow:compensating | Compensation is running |
step:started | A step begins executing |
step:completed | A step finished successfully |
step:failed | A step threw an error |
step:retry | A step is about to retry after failure |
signal:received | engine.signal() delivered a signal |
signal:timeout | A waitFor timed out |
Use engine.off(type, listener) and engine.offAny(listener) to unsubscribe.
Cleanup & Archival
Manage execution history with built-in cleanup and archival:
const engine = new Engine({ embedded: true });
// Delete old completed/failed executions (older than 7 days)
const deleted = engine.cleanup(7 * 24 * 60 * 60 * 1000);
console.log(`Deleted ${deleted} executions`);
// Or selectively clean only completed executions
engine.cleanup(7 * 24 * 60 * 60 * 1000, ['completed']);
// Archive instead of delete (moves to archive table)
const archived = engine.archive(30 * 24 * 60 * 60 * 1000); // 30 days
console.log(`Archived ${archived} executions`);
// Check archive count
console.log(`Total archived: ${engine.getArchivedCount()}`);
Cleanup vs Archive:
cleanup(maxAgeMs, states?)— Permanently deletes executions older thanmaxAgeMsarchive(maxAgeMs, states?)— Moves executions to a separateworkflow_executions_archivetable (transactional, up to 1000 per call)- Both accept an optional
statesfilter:['completed'],['failed'],['completed', 'failed'], etc.
Loops (doUntil / doWhile)
Repeat a set of steps based on a condition. Two flavors:
doUntil(condition, builder, options?)— Runs steps first, then checks condition. Repeats until condition returnstrue(do…until semantics).doWhile(condition, builder, options?)— Checks condition first, then runs steps. Repeats while condition returnstrue(while…do semantics).
// doUntil: retry sending until delivery confirmed
const flow = new Workflow('delivery')
.doUntil(
(ctx) => (ctx.steps['send'] as { delivered: boolean })?.delivered === true,
(w) => w.step('send', async (ctx) => {
const result = await deliveryService.attempt(ctx.input);
return { delivered: result.success };
}),
{ maxIterations: 10 } // safety limit (default: 100)
);
// doWhile: process items while queue has items
const batchFlow = new Workflow('batch')
.doWhile(
(ctx) => {
const remaining = (ctx.steps['process'] as { remaining: number })?.remaining ?? 10;
return remaining > 0;
},
(w) => w.step('process', async (ctx) => {
const batch = await fetchNextBatch();
await processBatch(batch);
return { remaining: await getQueueSize() };
}),
);
Key behaviors:
doWhilecan skip entirely if the condition isfalseon the first checkdoUntilalways runs at least oncemaxIterationsprevents infinite loops (default: 100)- Loop step results are overwritten each iteration — only the last iteration’s result is available downstream
- Conditions can be async (return a
Promise<boolean>)
Iterate over a dynamic list of items, executing a step for each:
const flow = new Workflow<{ userIds: string[] }>('notify-all')
.forEach(
(ctx) => (ctx.input as { userIds: string[] }).userIds, // items extractor
'notify', // step name
async (ctx) => {
const userId = ctx.steps.__item as string; // current item
const index = ctx.steps.__index as number; // current index (0-based)
await sendNotification(userId);
return { notified: userId };
},
{ retry: 3, timeout: 10_000 } // standard step options
);
Key behaviors:
- Results are stored with indexed names:
notify:0,notify:1,notify:2, etc. - Each iteration receives
__item(current item) and__index(current index) viactx.steps - Items are processed sequentially (not in parallel)
maxIterationsoption limits array size (default: 1000)- Standard step options (
retry,timeout,compensate,inputSchema,outputSchema) apply to each iteration
Transform step results into a new value without executing an async handler. A synchronous, pure data-transform node:
const flow = new Workflow('etl')
.step('fetch', async () => {
const records = await db.query('SELECT * FROM events');
return { records };
})
.map('aggregate', (ctx) => {
const { records } = ctx.steps['fetch'] as { records: Event[] };
return {
total: records.length,
byType: Object.groupBy(records, r => r.type),
};
})
.step('store', async (ctx) => {
const agg = ctx.steps['aggregate'] as AggregatedData;
await analytics.insert(agg);
return { stored: true };
});
Key behaviors:
- Runs synchronously (no retry, no timeout) — it’s a pure transform
- Result is stored under the map name (e.g.,
ctx.steps['aggregate']) - The transform function receives the full
StepContext(input, steps, signals)
Schema Validation
Validate step inputs and outputs with any schema library that has a .parse() method (Zod, ArkType, Valibot, etc.):
import { z } from 'zod';
const OrderInput = z.object({
orderId: z.string(),
amount: z.number().positive(),
});
const ChargeResult = z.object({
transactionId: z.string(),
charged: z.number(),
});
const flow = new Workflow('validated-order')
.step('validate', async (ctx) => {
const { orderId } = ctx.input as { orderId: string; amount: number };
return { orderId, validated: true };
}, {
inputSchema: OrderInput, // validates ctx.input before handler runs
})
.step('charge', async (ctx) => {
return { transactionId: 'tx_123', charged: 99.99 };
}, {
outputSchema: ChargeResult, // validates return value after handler runs
});
Key behaviors:
inputSchemavalidatesctx.inputbefore the step handler executesoutputSchemavalidates the handler’s return value after execution- Uses duck typing: any object with a
.parse(data)method works — no runtime dependency on Zod - Validation failure throws an error (triggers retry or compensation like any other step failure)
- Works with Zod, ArkType, Valibot, or any custom schema object
Monitor a specific execution’s events in real-time:
const run = await engine.start('order-pipeline', { orderId: 'ORD-1' });
// Subscribe to all events for this execution
const unsubscribe = engine.subscribe(run.id, (event) => {
console.log(`[${event.type}]`, event);
if (event.type === 'workflow:completed') {
console.log('Order pipeline finished!');
}
});
// Later: stop listening
unsubscribe();
Key behaviors:
- Returns an
unsubscribefunction — call it to stop receiving events - Only receives events for the specified execution ID (filters automatically)
- Receives all event types:
step:started,step:completed,step:failed,step:retry,workflow:*,signal:* - Complements
engine.on()/engine.onAny()which are global (all executions)
// Embedded mode — everything in-process, no server needed
const engine = new Engine({ embedded: true });
// Embedded with SQLite persistence
const engine = new Engine({
embedded: true,
dataPath: './data/workflows.db',
});
// TCP mode — connects to a running bunqueue server
const engine = new Engine({
connection: { host: 'localhost', port: 6789 },
});
// All options
const engine = new Engine({
embedded: true, // Use embedded mode (default: false)
connection: { port: 6789 }, // TCP server connection (mutually exclusive with embedded)
dataPath: './data/wf.db', // SQLite persistence path
concurrency: 10, // Max parallel step executions (default: 5)
queueName: '__wf:steps', // Internal queue name (default: '__wf:steps')
onEvent: (event) => {}, // Global event listener (optional)
});
| Method | Returns | Description |
|---|---|---|
engine.register(workflow) | this | Register a workflow definition. Chainable. |
engine.start(name, input?) | Promise<RunHandle> | Start a new execution. Returns { id, workflowName }. |
engine.getExecution(id) | Execution | null | Get full execution state by ID. |
engine.listExecutions(name?, state?) | Execution[] | List executions with optional filters. |
engine.signal(id, event, payload?) | Promise<void> | Send a signal to resume a waiting execution. |
engine.on(type, listener) | this | Subscribe to a specific event type. Chainable. |
engine.onAny(listener) | this | Subscribe to all events. Chainable. |
engine.off(type, listener) | this | Unsubscribe from a specific event type. Chainable. |
engine.offAny(listener) | this | Unsubscribe from all events. Chainable. |
engine.subscribe(id, callback) | () => void | Subscribe to events for a specific execution. Returns unsubscribe function. |
engine.cleanup(maxAgeMs, states?) | number | Delete executions older than maxAgeMs. Returns count. |
engine.archive(maxAgeMs, states?) | number | Move old executions to archive table. Returns count. |
engine.getArchivedCount() | number | Count of archived executions. |
engine.close(force?) | Promise<void> | Shut down engine, queue, and worker. |
const exec = engine.getExecution(run.id);
exec.id; // 'wf_abc123' — unique execution ID
exec.workflowName; // 'order-pipeline'
exec.state; // 'running' | 'completed' | 'failed' | 'waiting' | 'compensating'
exec.input; // { orderId: 'ORD-1', amount: 99.99 }
exec.steps; // Step-by-step status and results:
// {
// 'validate': { status: 'completed', result: { orderId: 'ORD-1', validated: true } },
// 'charge': { status: 'completed', result: { transactionId: 'tx_abc' } },
// 'confirm': { status: 'running' }
// }
exec.signals; // { 'manager-approval': { approved: true } }
exec.createdAt; // 1712700000000
exec.updatedAt; // 1712700005000
Execution states:
| State | Meaning |
|---|---|
running | Steps are being executed |
completed | All steps finished successfully |
failed | A step threw an error (compensation has run) |
waiting | Paused at a waitFor, waiting for a signal |
compensating | Compensation handlers are running |
Real-World Examples
E-commerce Order Pipeline
A complete order flow with inventory reservation, payment processing, and automatic rollback on failure:
const orderFlow = new Workflow<{ orderId: string; items: Item[]; amount: number }>('process-order')
.step('validate-order', async (ctx) => {
const { orderId, items, amount } = ctx.input as OrderInput;
if (items.length === 0) throw new Error('Empty cart');
if (amount <= 0) throw new Error('Invalid amount');
// Check all items are in catalog
for (const item of items) {
const exists = await catalog.exists(item.sku);
if (!exists) throw new Error(`Unknown SKU: ${item.sku}`);
}
return { orderId, itemCount: items.length, amount };
})
.step('reserve-inventory', async (ctx) => {
const { items } = ctx.input as OrderInput;
const reservationId = await inventory.reserveBatch(items);
return { reservationId };
}, {
retry: 3, // Retry on transient inventory service errors
compensate: async (ctx) => {
const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string };
await inventory.releaseBatch(reservationId);
},
})
.step('process-payment', async (ctx) => {
const { amount, orderId } = ctx.steps['validate-order'] as ValidatedOrder;
const charge = await stripe.charges.create({
amount: Math.round(amount * 100),
currency: 'usd',
metadata: { orderId },
});
return { chargeId: charge.id, receiptUrl: charge.receipt_url };
}, {
retry: 5, // Payment gateway can be flaky
timeout: 15000, // 15s timeout per attempt
compensate: async (ctx) => {
const { chargeId } = ctx.steps['process-payment'] as { chargeId: string };
await stripe.refunds.create({ charge: chargeId });
},
})
.step('create-shipment', async (ctx) => {
const { orderId, items } = ctx.input as OrderInput;
const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string };
const shipment = await shipping.create({ orderId, items, reservationId });
return { trackingNumber: shipment.tracking, carrier: shipment.carrier };
})
.parallel((w) => w
.step('send-confirmation', async (ctx) => {
const payment = ctx.steps['process-payment'] as { chargeId: string; receiptUrl: string };
const shipment = ctx.steps['create-shipment'] as { trackingNumber: string; carrier: string };
const { email } = ctx.input as { email: string };
await mailer.send('order-confirmation', {
to: email,
receiptUrl: payment.receiptUrl,
tracking: shipment.trackingNumber,
});
return { emailSent: true };
})
.step('notify-warehouse', async (ctx) => {
const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string };
await warehouse.notifyShipment(reservationId);
return { warehouseNotified: true };
})
.step('update-analytics', async (ctx) => {
const { amount, orderId } = ctx.steps['validate-order'] as ValidatedOrder;
await analytics.trackPurchase({ orderId, amount });
return { tracked: true };
})
);
What happens on failure:
- If
process-paymentfails after 5 retries →reserve-inventorycompensation runs (items released) - If
create-shipmentfails →process-paymentcompensation runs (refund), thenreserve-inventorycompensation runs (items released) - If any parallel notification step fails → full rollback: refund payment, release inventory
- The
parallel()block sends email, notifies warehouse, and tracks analytics concurrently — much faster than sequential
CI/CD Deployment Pipeline with Approval Gate
Section titled “CI/CD Deployment Pipeline with Approval Gate”
Build, test, deploy to staging, wait for manual approval, then deploy to production:
const deployFlow = new Workflow('deploy-pipeline')
.step('build', async (ctx) => {
const { repo, branch, commitSha } = ctx.input as DeployInput;
const build = await ci.triggerBuild({ repo, branch, commitSha });
await ci.waitForBuild(build.id); // Polls until complete
return { buildId: build.id, artifact: build.artifactUrl, duration: build.durationMs };
})
.step('run-tests', async (ctx) => {
const { buildId } = ctx.steps['build'] as { buildId: string };
const results = await ci.runTestSuite(buildId, {
suites: ['unit', 'integration', 'e2e'],
parallel: true,
});
if (results.failed > 0) {
throw new Error(`${results.failed}/${results.total} tests failed`);
}
return { passed: results.passed, coverage: results.coverage };
})
.step('deploy-staging', async (ctx) => {
const { artifact } = ctx.steps['build'] as { artifact: string };
await k8s.deploy('staging', artifact);
const healthCheck = await k8s.waitForHealthy('staging', 60000);
// Notify the team
await slack.send('#deploys', {
text: `Staging deploy ready for review`,
url: `https://staging.example.com`,
});
return { env: 'staging', healthy: healthCheck.ok };
}, {
compensate: async () => {
// Rollback staging to previous version
await k8s.rollback('staging');
},
})
.waitFor('production-approval', { timeout: 48 * 60 * 60 * 1000 }) // 48h timeout
.step('deploy-production', async (ctx) => {
const approval = ctx.signals['production-approval'] as {
approver: string;
strategy: 'rolling' | 'blue-green' | 'canary';
};
const { artifact } = ctx.steps['build'] as { artifact: string };
// Deploy with the approved strategy
await k8s.deploy('production', artifact, { strategy: approval.strategy });
await k8s.waitForHealthy('production', 120000);
await slack.send('#deploys', {
text: `Production deploy complete (${approval.strategy})`,
approvedBy: approval.approver,
});
return {
env: 'production',
approvedBy: approval.approver,
strategy: approval.strategy,
};
}, {
compensate: async () => {
await k8s.rollback('production');
await slack.send('#deploys', { text: 'Production rolled back!' });
},
});
// Usage
const run = await engine.start('deploy-pipeline', {
repo: 'myapp',
branch: 'release/v2.5',
commitSha: 'abc123f',
});
// After QA on staging (hours/days later):
await engine.signal(run.id, 'production-approval', {
approver: 'cto@company.com',
strategy: 'canary',
});
KYC Onboarding with Risk-Based Branching
Different verification paths based on risk scoring — low-risk users get auto-approved, medium-risk need document upload, high-risk go to manual compliance review:
const kycFlow = new Workflow('kyc-onboarding')
.step('create-account', async (ctx) => {
const { email, name, country } = ctx.input as OnboardingInput;
const user = await db.users.create({ email, name, country, status: 'pending' });
return { userId: user.id };
}, {
compensate: async (ctx) => {
// Delete the account if verification fails
const { userId } = ctx.steps['create-account'] as { userId: string };
await db.users.delete(userId);
},
})
.step('risk-assessment', async (ctx) => {
const { country, email, ip } = ctx.input as OnboardingInput;
const score = await riskEngine.assess({ country, email, ip });
return {
score,
riskLevel: score > 80 ? 'low' : score > 50 ? 'medium' : 'high',
};
})
.branch((ctx) => (ctx.steps['risk-assessment'] as { riskLevel: string }).riskLevel)
.path('low', (w) =>
w.step('auto-approve', async () => {
return { approved: true, method: 'automatic', verifiedAt: Date.now() };
})
)
.path('medium', (w) =>
w.step('request-documents', async (ctx) => {
const { userId } = ctx.steps['create-account'] as { userId: string };
await mailer.send('document-request', { userId });
return { documentsRequested: true };
})
.waitFor('documents-uploaded')
.step('verify-documents', async (ctx) => {
const docs = ctx.signals['documents-uploaded'] as { files: string[] };
const verification = await docVerification.check(docs.files);
if (!verification.passed) {
throw new Error(`Document verification failed: ${verification.reason}`);
}
return { approved: true, method: 'document-review', verifiedAt: Date.now() };
})
)
.path('high', (w) =>
w.step('flag-compliance', async (ctx) => {
const { userId } = ctx.steps['create-account'] as { userId: string };
const { score } = ctx.steps['risk-assessment'] as { score: number };
await complianceQueue.assign({ userId, riskScore: score });
return { flagged: true };
})
.waitFor('compliance-decision')
.step('apply-compliance-decision', async (ctx) => {
const decision = ctx.signals['compliance-decision'] as {
approved: boolean;
reviewer: string;
notes: string;
};
if (!decision.approved) {
throw new Error(`Rejected by compliance: ${decision.notes}`);
}
return { approved: true, method: 'compliance-review', reviewer: decision.reviewer };
})
)
.step('activate-account', async (ctx) => {
const { userId } = ctx.steps['create-account'] as { userId: string };
await db.users.update(userId, { status: 'active', activatedAt: Date.now() });
await mailer.send('welcome', { userId });
return { activated: true };
});
Data Pipeline (ETL)
Extract, transform, load with metrics aggregation at each stage:
const etlFlow = new Workflow('daily-etl')
.step('extract', async (ctx) => {
const { date, sources } = ctx.input as { date: string; sources: string[] };
const records: Record[] = [];
for (const source of sources) {
const data = await dataLake.query(source, { date });
records.push(...data);
}
return { totalRecords: records.length, sources: sources.length, data: records };
})
.step('transform', async (ctx) => {
const { data } = ctx.steps['extract'] as { data: Record[] };
const cleaned = data
.filter(r => r.timestamp && r.value !== null)
.map(r => ({ ...r, value: normalize(r.value), processedAt: Date.now() }));
const dropped = data.length - cleaned.length;
return { cleanedRecords: cleaned.length, droppedRecords: dropped, data: cleaned };
})
.step('load', async (ctx) => {
const { data } = ctx.steps['transform'] as { data: CleanRecord[] };
const extract = ctx.steps['extract'] as { totalRecords: number; sources: number };
const transform = ctx.steps['transform'] as { cleanedRecords: number; droppedRecords: number };
// Batch insert
const batches = chunk(data, 1000);
for (const batch of batches) {
await warehouse.insertBatch('analytics_events', batch);
}
return {
pipeline: 'daily-etl',
date: (ctx.input as { date: string }).date,
metrics: {
sourcesProcessed: extract.sources,
rawRecords: extract.totalRecords,
cleanedRecords: transform.cleanedRecords,
droppedRecords: transform.droppedRecords,
loadedRecords: data.length,
},
};
});
Putting It Together: ETL with Observability
Section titled “Putting It Together: ETL with Observability”
Wire up the ETL pipeline with monitoring and cleanup:
const engine = new Engine({ embedded: true, dataPath: './data/etl.db' });
// Observability: track step durations and failures
engine.on('step:started', (e) => {
const { stepName } = e as StepEvent;
metrics.startTimer(`etl.${stepName}.duration`);
});
engine.on('step:completed', (e) => {
const { stepName } = e as StepEvent;
metrics.stopTimer(`etl.${stepName}.duration`);
metrics.increment('etl.steps.completed');
});
engine.on('step:retry', (e) => {
const { stepName, attempt, error } = e as StepEvent;
logger.warn(`Retrying ${stepName}, attempt ${attempt}: ${error}`);
});
engine.on('workflow:failed', (e) => {
alerting.pagerduty(`ETL pipeline failed: ${e.executionId}`);
});
engine.register(etlFlow);
// Run daily via cron
await engine.start('daily-etl', { date: '2026-04-10', sources: ['clickstream', 'transactions'] });
// Cleanup: archive completed runs older than 30 days, delete archived after 90 days
engine.archive(30 * 24 * 60 * 60 * 1000, ['completed']);
engine.cleanup(90 * 24 * 60 * 60 * 1000);
Batch Processing with forEach and Map
Process a list of invoices, aggregate results, and send a summary:
import { z } from 'zod';
const InvoiceInput = z.object({
invoiceIds: z.array(z.string()),
batchId: z.string(),
});
const invoiceFlow = new Workflow<{ invoiceIds: string[]; batchId: string }>('process-invoices')
// Validate input with Zod schema
.step('init', async (ctx) => {
return { count: (ctx.input as { invoiceIds: string[] }).invoiceIds.length };
}, { inputSchema: InvoiceInput })
// Process each invoice
.forEach(
(ctx) => (ctx.input as { invoiceIds: string[] }).invoiceIds,
'process-invoice',
async (ctx) => {
const invoiceId = ctx.steps.__item as string;
const result = await billingService.process(invoiceId);
return { invoiceId, amount: result.amount, status: result.status };
},
{ retry: 3, timeout: 15_000 }
)
// Aggregate all invoice results
.map('summary', (ctx) => {
const results: { amount: number; status: string }[] = [];
let i = 0;
while (ctx.steps[`process-invoice:${i}`]) {
results.push(ctx.steps[`process-invoice:${i}`] as { amount: number; status: string });
i++;
}
const total = results.reduce((sum, r) => sum + r.amount, 0);
const failed = results.filter(r => r.status === 'failed').length;
return { total, processed: results.length, failed };
})
// Send summary report
.step('report', async (ctx) => {
const summary = ctx.steps['summary'] as { total: number; processed: number; failed: number };
await notificationService.send({
channel: '#billing',
text: `Batch complete: ${summary.processed} invoices, $${summary.total} total, ${summary.failed} failures`,
});
return { reported: true };
});
Retry Loop with doUntil
Poll an external API until a resource is ready:
const deployFlow = new Workflow<{ deployId: string }>('wait-deploy')
.step('trigger', async (ctx) => {
const id = (ctx.input as { deployId: string }).deployId;
await cloudProvider.triggerDeploy(id);
return { deployId: id };
})
.doUntil(
(ctx) => (ctx.steps['check-status'] as { ready: boolean })?.ready === true,
(w) => w.step('check-status', async (ctx) => {
const id = (ctx.steps['trigger'] as { deployId: string }).deployId;
const status = await cloudProvider.getDeployStatus(id);
// Simulate wait between polls
await new Promise((r) => setTimeout(r, 5000));
return { ready: status === 'running', status };
}, { retry: 1, timeout: 30_000 }),
{ maxIterations: 60 } // max 5 minutes of polling
)
.step('verify', async (ctx) => {
const id = (ctx.steps['trigger'] as { deployId: string }).deployId;
const health = await cloudProvider.healthCheck(id);
return { healthy: health.ok };
});
How It Works Internally
The workflow engine is a pure consumer layer built on top of bunqueue. Zero modifications to the core engine.
Workflow DSL (.step / .branch / .waitFor)
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Engine │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Executor │ │
│ │ • Resolves current node (step/branch/parallel/waitFor/ │ │
│ │ doUntil/doWhile/forEach/map) │ │
│ │ • Runs step handler with timeout + retry (backoff) │ │
│ │ • Schema validation (inputSchema/outputSchema) │ │
│ │ • Evaluates branch condition, picks path │ │
│ │ • Runs parallel steps via Promise.allSettled │ │
│ │ • Executes loops (doUntil/doWhile) with maxIterations │ │
│ │ • forEach: iterates items with indexed step names │ │
│ │ • map: synchronous data transforms │ │
│ │ • Checks signal availability + timeout for waitFor │ │
│ │ • Dispatches sub-workflows, polls until complete │ │
│ │ • Runs compensation in reverse on failure │ │
│ └──────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────────────────────────┐ │
│ │ │ │ │
│ │ ┌────────┐ ┌───▼────┐ ┌───────────────────────┐ │ │
│ │ │ Queue │ │ Worker │ │ Store (SQLite) │ │ │
│ │ │__wf: │ │ pulls │ │ workflow_executions │ │ │
│ │ │steps │──│ & runs │──│ table: id, state, │ │ │
│ │ │ │ │ steps │ │ input, steps, signals │ │ │
│ │ └────────┘ └────────┘ └───────────────────────┘ │ │
│ │ bunqueue internals (Queue + Worker + SQLite) │ │
│ └─────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
Execution flow:
engine.start()creates anExecutionrecord in SQLite and enqueues the first step as a job on the internal__wf:stepsqueue- Worker picks up the step job. The Executor loads the execution state, resolves the current node
- Step node: runs the handler with retry + timeout, saves result, enqueues next node
- Branch node: evaluates the condition function, runs the matching path’s steps inline
- Parallel node: runs all steps via
Promise.allSettled, saves all results, then advances - WaitFor node: checks if the signal exists. If not, sets state to
'waiting'and schedules a timeout check if configured - SubWorkflow node: starts a child execution, polls until it reaches a terminal state, saves child results under
sub:<name> - DoUntil/DoWhile node: runs loop steps repeatedly, checking condition before (doWhile) or after (doUntil) each iteration
- ForEach node: extracts items array, runs the step for each with indexed names (
step:0,step:1, …) - Map node: runs a synchronous transform, stores result, then advances
engine.signal()stores the signal payload and re-enqueues the current node- On failure: the Executor walks completed steps in reverse, calling each compensate handler
Each workflow step is a regular bunqueue job. You get all of bunqueue’s features for free: SQLite persistence, concurrency control, and monitoring via the dashboard.
Limitations & Caveats
Before using the workflow engine in production, be aware of these trade-offs:
| Limitation | Details |
|---|---|
| Single-instance only | The workflow engine runs in-process. There is no distributed coordination — you cannot run multiple engine instances on the same database. |
| No auto-resume after crash | If the process crashes while a workflow is in running state, that execution stays in running forever. Only waiting workflows can be resumed via signal(). You need application-level recovery (e.g., query for stale running executions on startup). |
| At-most-once execution | Steps are not replayed on restart. If a step partially commits (e.g., writes to an external API) and the process crashes before saving the result, the side effect happened but the engine doesn’t know. |
| Compensation is not idempotent | The engine doesn’t track which compensations have run. If the process crashes mid-compensation, restarting may re-run already-completed handlers. Design compensations to be idempotent. |
| waitFor timeout not persisted | Signal timeouts use in-memory setTimeout. If the process restarts, the timeout is lost and the workflow waits indefinitely. |
| Sub-workflow 300s timeout | Sub-workflows have a hardcoded 5-minute timeout (not configurable). Long-running child workflows will fail the parent. |
| listExecutions returns max 100 | engine.listExecutions() is capped at 100 results with no pagination support. For larger datasets, query the SQLite store directly. |
- Simple Mode — All-in-one Queue + Worker for simpler use cases
- Queue API — Low-level queue operations
- Flow Producer — Parent-child job dependencies (simpler than workflows)
- MCP Server — Let AI agents orchestrate workflows via natural language
- Examples — More code recipes