High-performance job queue for Bun. Built for AI agents and automation.
Zero external dependencies. MCP-native. TypeScript-first.
Documentation · Benchmarks · npm
Quickstart
import { Bunqueue } from 'bunqueue/client'; const app = new Bunqueue('emails', { embedded: true, processor: async (job) => { console.log(`Sending to ${job.data.to}`); return { sent: true }; }, }); await app.add('send', { to: 'alice@example.com' });
That's it. Queue + Worker in one object. No Redis, no config, no setup.
Simple Mode
Simple Mode gives you a Queue and a Worker in a single object. Add jobs, process them, add middleware, schedule crons — all from one place.
Use Bunqueue when producer and consumer are in the same process. For distributed systems, use Queue + Worker separately. For AI agent workflows, use the MCP Server instead — agents control queues via natural language without writing code.
Architecture
new Bunqueue('emails', opts)
│
├── this.queue = new Queue('emails', ...)
├── this.worker = new Worker('emails', ...)
│
└── Subsystems (all optional):
├── RetryEngine ── jitter, fibonacci, exponential, custom
├── CircuitBreaker ── pauses worker after N failures
├── BatchAccumulator ── groups N jobs into one call
├── TriggerManager ── "on complete → create job B"
├── TtlChecker ── rejects expired jobs
├── PriorityAger ── boosts old jobs' priority
├── CancellationManager ── AbortController per job
└── DedupDebounceMerger ── deduplication & debounce
Processing pipeline per job:
Job → Circuit Breaker → TTL check → AbortController → Retry → Middleware → Processor
Routes
Route jobs to different handlers by name:
const app = new Bunqueue('notifications', { embedded: true, routes: { 'send-email': async (job) => { await sendEmail(job.data.to); return { channel: 'email' }; }, 'send-sms': async (job) => { await sendSMS(job.data.to); return { channel: 'sms' }; }, }, }); await app.add('send-email', { to: 'alice' }); await app.add('send-sms', { to: 'bob' });
Note: Use one of
processor,routes, orbatch. Passing multiple or none throws an error.
Middleware
Wraps every job execution. Execution order is onion-style: mw1 → mw2 → processor → mw2 → mw1. When no middleware is added, zero overhead.
// Timing middleware app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name}: ${Date.now() - start}ms`); return result; }); // Error recovery middleware app.use(async (job, next) => { try { return await next(); } catch (err) { return { recovered: true, error: err.message }; } });
Batch Processing
Accumulates N jobs and processes them together. Flushes when buffer reaches size or timeout expires. On close(), remaining jobs are flushed.
const app = new Bunqueue('db-inserts', { embedded: true, batch: { size: 50, timeout: 2000, processor: async (jobs) => { const rows = jobs.map(j => j.data.row); await db.insertMany('table', rows); return jobs.map(() => ({ inserted: true })); }, }, });
Advanced Retry
5 strategies + retry predicate:
const app = new Bunqueue('api-calls', { embedded: true, processor: async (job) => { const res = await fetch(job.data.url); if (!res.ok) throw new Error(`HTTP ${res.status}`); return { status: res.status }; }, retry: { maxAttempts: 5, delay: 1000, strategy: 'jitter', retryIf: (error) => error.message.includes('503'), }, });
| Strategy | Formula | Use case |
|---|---|---|
fixed |
delay every time |
Rate-limited APIs |
exponential |
delay × 2^attempt |
General purpose |
jitter |
delay × 2^attempt × random(0.5-1.0) |
Thundering herd prevention |
fibonacci |
delay × fib(attempt) |
Gradual backoff |
custom |
customBackoff(attempt, error) → ms |
Anything |
This is in-process retry — the job stays active. Different from core
attempts/backoffwhich re-queues.
Graceful Cancellation
Cancel running jobs with AbortController:
const app = new Bunqueue('encoding', { embedded: true, processor: async (job) => { const signal = app.getSignal(job.id); for (const chunk of chunks) { if (signal?.aborted) throw new Error('Cancelled'); await encode(chunk); } return { done: true }; }, }); const job = await app.add('video', { file: 'big.mp4' }); app.cancel(job.id); // cancel immediately app.cancel(job.id, 5000); // cancel after 5s grace period
The signal works with fetch too: await fetch(url, { signal }).
Circuit Breaker
Pauses the worker after too many consecutive failures:
CLOSED ──→ failures ≥ threshold ──→ OPEN (worker paused)
│
←── success ──── HALF-OPEN ←── timeout expires
const app = new Bunqueue('payments', { embedded: true, processor: async (job) => paymentGateway.charge(job.data), circuitBreaker: { threshold: 5, resetTimeout: 30000, onOpen: () => alert('Gateway down!'), onClose: () => alert('Gateway recovered'), }, }); app.getCircuitState(); // 'closed' | 'open' | 'half-open' app.resetCircuit(); // force close + resume worker
Event Triggers
Create follow-up jobs automatically when a job completes or fails:
const app = new Bunqueue('orders', { embedded: true, routes: { 'place-order': async (job) => ({ orderId: job.data.id, total: 99 }), 'send-receipt': async (job) => ({ sent: true }), 'fraud-alert': async (job) => ({ alerted: true }), }, }); app.trigger({ on: 'place-order', create: 'send-receipt', data: (result, job) => ({ id: job.data.id }), }); // Conditional trigger (only for large orders) app.trigger({ on: 'place-order', create: 'fraud-alert', data: (result) => ({ amount: result.total }), condition: (result) => result.total > 1000, }); // Chain triggers app .trigger({ on: 'step-1', create: 'step-2', data: (r) => r }) .trigger({ on: 'step-2', create: 'step-3', data: (r) => r });
Job TTL
Expire unprocessed jobs. Checked when the worker picks up the job:
const app = new Bunqueue('otp', { embedded: true, processor: async (job) => verifyOTP(job.data.code), ttl: { defaultTtl: 300000, perName: { 'verify-otp': 60000, 'daily-report': 0, }, }, }); app.setDefaultTtl(120000); app.setNameTtl('flash-sale', 30000);
Resolution: perName[job.name] → defaultTtl → 0 (no TTL).
Priority Aging
Automatically boosts priority of old waiting jobs to prevent starvation:
const app = new Bunqueue('tasks', { embedded: true, processor: async (job) => ({ done: true }), priorityAging: { interval: 60000, minAge: 300000, boost: 2, maxPriority: 100, maxScan: 200, }, });
A job with priority 1, after 5 min: 3, after 10 min: 5, … capped at 100.
Deduplication
Prevent duplicate jobs. Jobs with the same name + data get the same dedup ID:
const app = new Bunqueue('webhooks', { embedded: true, processor: async (job) => processWebhook(job.data), deduplication: { ttl: 60000, extend: false, replace: false, }, }); await app.add('hook', { event: 'user.created', userId: '123' }); await app.add('hook', { event: 'user.created', userId: '123' }); // deduplicated! await app.add('hook', { event: 'user.updated', userId: '123' }); // different data → new job
Override per-job: await app.add('task', data, { deduplication: { id: 'my-id', ttl: 5000 } }).
Debouncing
Coalesce rapid same-name jobs. Only the last one in the TTL window gets processed:
const app = new Bunqueue('search', { embedded: true, processor: async (job) => executeSearch(job.data.query), debounce: { ttl: 500 }, }); await app.add('search', { query: 'h' }); await app.add('search', { query: 'he' }); await app.add('search', { query: 'hello' }); // only this one processes
Rate Limiting
const app = new Bunqueue('api', { embedded: true, processor: async (job) => callExternalAPI(job.data), rateLimit: { max: 100, duration: 1000 }, }); // Per-group rate limiting (e.g., per customer) const app2 = new Bunqueue('api', { embedded: true, processor: async (job) => callAPI(job.data), rateLimit: { max: 10, duration: 1000, groupKey: 'customerId' }, }); app.setGlobalRateLimit(50, 1000); app.removeGlobalRateLimit();
DLQ (Dead Letter Queue)
const app = new Bunqueue('critical', { embedded: true, processor: async (job) => riskyOperation(job.data), dlq: { autoRetry: true, autoRetryInterval: 3600000, maxAutoRetries: 3, maxAge: 604800000, maxEntries: 10000, }, }); app.getDlq(); // all entries app.getDlqStats(); // { total, byReason, ... } app.getDlq({ reason: 'timeout' }); // filter by reason app.retryDlq(); // retry all app.purgeDlq(); // clear all
Failure reasons: explicit_fail, max_attempts_exceeded, timeout, stalled, ttl_expired, worker_lost.
Cron Jobs
await app.cron('daily-report', '0 9 * * *', { type: 'report' }); await app.cron('eu-digest', '0 8 * * 1', { type: 'weekly' }, { timezone: 'Europe/Rome' }); await app.every('healthcheck', 30000, { type: 'ping' }); await app.listCrons(); await app.removeCron('healthcheck');
Events
app.on('completed', (job, result) => { }); app.on('failed', (job, error) => { }); app.on('active', (job) => { }); app.on('progress', (job, progress) => { }); app.on('stalled', (jobId, reason) => { }); app.on('error', (error) => { }); app.on('ready', () => { }); app.on('drained', () => { }); app.on('closed', () => { });
Adding Jobs
await app.add('task', { key: 'value' }); await app.add('urgent', data, { priority: 10, delay: 5000, attempts: 5, durable: true }); await app.addBulk([ { name: 'email', data: { to: 'alice' } }, { name: 'email', data: { to: 'bob' }, opts: { priority: 10 } }, ]);
Control
app.pause(); // pause queue + worker app.resume(); // resume both await app.close(); // graceful shutdown await app.close(true); // force shutdown
Full Example
import { Bunqueue, shutdownManager } from 'bunqueue/client'; const app = new Bunqueue<{ payload: string }>('my-app', { embedded: true, routes: { 'process': async (job) => ({ id: job.data.payload, status: 'done' }), 'notify': async (job) => ({ sent: true }), 'alert': async (job) => ({ alerted: true }), }, concurrency: 10, retry: { maxAttempts: 3, delay: 1000, strategy: 'jitter' }, circuitBreaker: { threshold: 5, resetTimeout: 30000 }, ttl: { defaultTtl: 600000, perName: { 'verify-otp': 60000 } }, priorityAging: { interval: 60000, minAge: 300000, boost: 1 }, deduplication: { ttl: 5000 }, rateLimit: { max: 100, duration: 1000 }, dlq: { autoRetry: true, maxAge: 604800000 }, }); app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name}: ${Date.now() - start}ms`); return result; }); app .trigger({ on: 'process', create: 'notify', data: (r) => ({ payload: r.id }) }) .trigger({ on: 'process', event: 'failed', create: 'alert', data: (_, j) => j.data }); await app.cron('cleanup', '0 2 * * *', { payload: 'nightly' }); await app.add('process', { payload: 'ORD-001' }); process.on('SIGINT', async () => { await app.close(); shutdownManager(); });
Workflow Engine
Orchestrate multi-step business processes with branching, saga compensation, and human-in-the-loop signals. Built on top of bunqueue — no new infrastructure.
import { Workflow, Engine } from 'bunqueue/workflow'; const orderFlow = new Workflow('order-pipeline') .step('validate', async (ctx) => { const { orderId, amount } = ctx.input as { orderId: string; amount: number }; if (amount <= 0) throw new Error('Invalid amount'); return { orderId }; }) .step('reserve-stock', async () => { await inventory.reserve(); return { reserved: true }; }, { compensate: async () => await inventory.release(), // Auto-rollback on failure }) .step('charge', async () => { return { txId: await payments.charge() }; }, { compensate: async () => await payments.refund(), }) .step('confirm', async (ctx) => { const { txId } = ctx.steps['charge'] as { txId: string }; return { emailSent: true, txId }; }); const engine = new Engine({ embedded: true }); engine.register(orderFlow); await engine.start('order-pipeline', { orderId: 'ORD-1', amount: 99.99 });
Features:
- Saga pattern — Compensation handlers run in reverse when a step fails
- Branching — Route to different paths based on runtime conditions
- Parallel steps — Run independent steps concurrently with
.parallel() - Human-in-the-loop —
waitFor('event')pauses execution,engine.signal()resumes it - Signal timeout —
waitFor('event', { timeout })fails if signal doesn't arrive in time - Step retry — Automatic retry with exponential backoff and jitter
- Nested workflows — Compose workflows with
.subWorkflow(), child results passed back - Loops —
doUntil()anddoWhile()for conditional iteration with safety limits - forEach — Iterate over dynamic item lists with indexed step results (
step:0,step:1, ...) - Map — Synchronous data transforms between steps with
.map() - Schema validation — Validate step input/output with Zod, ArkType, Valibot, or any
.parse()schema - Subscribe —
engine.subscribe(id, callback)to monitor a specific execution's events - Observability — Typed event emitter with 11 event types (
engine.on/onAny) - Cleanup & archival —
engine.cleanup()/engine.archive()for execution history management - Step timeouts — Prevent steps from running indefinitely
- Context passing — Each step accesses input and all previous step results
- SQLite persistence — Execution state survives restarts
vs Competitors:
| bunqueue | Temporal | Inngest | Trigger.dev | |
|---|---|---|---|---|
| Infrastructure | None (embedded) | PostgreSQL + 7 services | Cloud-only | Redis + PostgreSQL |
| Saga compensation | Built-in | Manual | Manual | Manual |
| Human-in-the-loop | .waitFor() |
Signals API | step.waitForEvent() |
Waitpoint tokens |
| Self-hosted | Zero-config | Complex | No | Complex |
| Pricing | Free (MIT) | Free / Cloud $$ | Per-execution | Free tier, then $50/mo+ |
// Branching const flow = new Workflow('tiered') .step('classify', async (ctx) => { const { amount } = ctx.input as { amount: number }; return { tier: amount > 1000 ? 'vip' : 'basic' }; }) .branch((ctx) => (ctx.steps['classify'] as { tier: string }).tier) .path('vip', (w) => w.step('vip-handler', async () => ({ discount: 20 }))) .path('basic', (w) => w.step('basic-handler', async () => ({ discount: 0 }))) .step('done', async () => ({ processed: true })); // Human-in-the-loop const approvalFlow = new Workflow('expense') .step('submit', async (ctx) => { const { amount } = ctx.input as { amount: number }; return { amount }; }) .waitFor('manager-approval') .step('reimburse', async (ctx) => { const decision = ctx.signals['manager-approval'] as { approved: boolean }; return { status: decision.approved ? 'paid' : 'rejected' }; }); // Signal a waiting workflow await engine.signal(run.id, 'manager-approval', { approved: true });
bunqueue Dashboard
A visual interface for managing queues, jobs, workers and monitoring in real time. Currently in beta.
bunqueue-demo.mp4
Want early access? Reach out at egeominotti@gmail.com
Why bunqueue?
| Library | Requires | AI-native |
|---|---|---|
| BullMQ | Redis | No |
| Agenda | MongoDB | No |
| pg-boss | PostgreSQL | No |
| bunqueue | Nothing | Yes |
- MCP server included — 73 tools, 5 resources, 3 prompts. AI agents get full control out of the box
- BullMQ-compatible API — Same
Queue,Worker,QueueEvents - Zero dependencies — No Redis, no MongoDB
- SQLite persistence — Survives restarts, WAL mode for concurrent access
- Up to 286K ops/sec — Verified benchmarks
Built for AI Agents (MCP Server)
bunqueue is the first job queue with native MCP support. AI agents get a full-featured scheduler, task queue, and monitoring system — no glue code needed.
HTTP Handlers solve a fundamental problem: an AI agent can schedule jobs and manage queues, but it cannot run a persistent worker. When the agent registers an HTTP handler, bunqueue spawns an embedded Worker that continuously pulls jobs and calls your HTTP endpoint. Responses are saved as results. Failed calls retry automatically via DLQ.
What AI agents can do with bunqueue:
- Schedule tasks — cron jobs, delayed execution, recurring workflows
- Manage job pipelines — push jobs, monitor progress, retry failures
- Full pull/ack/fail cycle — agents can consume and process jobs directly
- Monitor everything — stats, memory, Prometheus metrics, logs, DLQ
- Control flow — pause/resume queues, set rate limits, manage concurrency
- 73 MCP tools + 5 resources + 3 prompts — complete control over every feature
- HTTP handlers — register a URL, bunqueue auto-processes jobs via HTTP calls
# One command to connect Claude Code
claude mcp add bunqueue -- bunx bunqueue-mcp// Claude Desktop / Cursor / Windsurf — add to MCP config { "mcpServers": { "bunqueue": { "command": "bunx", "args": ["bunqueue-mcp"] } } }
Example agent interactions:
- "Schedule a cleanup job every day at 3 AM"
- "Add 500 email jobs to the queue with priority 10"
- "Show me all failed jobs and retry them"
- "Set rate limit to 50/sec on the api-calls queue"
- "What's the memory usage and queue throughput?"
Plugin ecosystem — bunqueue ships with auto-discovery (.mcp.json), a custom Claude Code agent for bunqueue tasks, and installable skills for setup, API reference, and real-world patterns. Drop bunqueue into any project and your AI tools discover it automatically.
Supports embedded (local SQLite) and TCP (remote server) modes. Full MCP documentation →
When to use bunqueue
Great for:
- AI agents that need a scheduler — cron jobs, delayed tasks, retries, all via MCP
- Agentic workflows — agents push jobs, workers process, agents monitor results
- Single-server deployments
- Prototypes and MVPs
- Moderate to high workloads (up to 286K ops/sec)
- Teams that want to avoid Redis operational overhead
- Embedded use cases (CLI tools, edge functions, serverless)
Not ideal for:
- Multi-region distributed systems requiring HA
- Workloads that need automatic failover today
- Systems already running Redis with existing infrastructure
Why not just use BullMQ?
If you're already running Redis, BullMQ is great — battle-tested and feature-rich.
bunqueue is for when you don't want to run Redis. SQLite with WAL mode handles surprisingly high throughput for single-node deployments (tested up to 286K ops/sec). You get persistence, priorities, delays, retries, cron jobs, and DLQ — without the operational overhead of another service.
Install
Requires Bun runtime. Node.js is not supported.
Two Modes
bunqueue runs in two modes depending on your architecture:
| Embedded | Server (TCP) | |
|---|---|---|
| How it works | Queue runs inside your process | Standalone server, clients connect via TCP |
| Setup | bun add bunqueue |
docker run or bunqueue start |
| Performance | 286K ops/sec | 149K ops/sec |
| Best for | Single-process apps, CLIs, serverless | Multiple workers, separate producer/consumer |
| Scaling | Same process only | Multiple clients across machines |
Embedded Mode
Everything runs in your process. No server, no network, no setup.
import { Queue, Worker } from 'bunqueue/client'; const queue = new Queue('emails', { embedded: true }); const worker = new Worker( 'emails', async (job) => { console.log('Processing:', job.data); return { sent: true }; }, { embedded: true } ); await queue.add('welcome', { to: 'user@example.com' });
Server Mode (TCP)
Run bunqueue as a standalone server. Multiple workers and producers connect via TCP.
# Start with persistent data
docker run -d -p 6789:6789 -p 6790:6790 \
-v bunqueue-data:/app/data \
ghcr.io/egeominotti/bunqueue:latestConnect from your app:
import { Queue, Worker } from 'bunqueue/client'; const queue = new Queue('tasks', { connection: { host: 'localhost', port: 6789 } }); const worker = new Worker( 'tasks', async (job) => { return { done: true }; }, { connection: { host: 'localhost', port: 6789 } } ); await queue.add('process', { data: 'hello' });
Simple Mode
One object. Queue + Worker + Routes + Middleware + Cron. Zero boilerplate.
import { Bunqueue } from 'bunqueue/client'; const app = new Bunqueue('notifications', { embedded: true, // Route jobs by name routes: { 'send-email': async (job) => { console.log(`Email to ${job.data.to}`); return { sent: true }; }, 'send-sms': async (job) => { console.log(`SMS to ${job.data.to}`); return { sent: true }; }, }, concurrency: 10, }); // Middleware — wraps every job (logging, timing, error recovery) app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name} took ${Date.now() - start}ms`); return result; }); // Cron — scheduled jobs await app.cron('daily-report', '0 9 * * *', { type: 'summary' }); await app.every('healthcheck', 30000, { type: 'ping' }); // Events app.on('completed', (job, result) => console.log(result)); app.on('failed', (job, err) => console.error(err)); // Add jobs await app.add('send-email', { to: 'alice@example.com' }); await app.add('send-sms', { to: '+1234567890' }); // Graceful shutdown await app.close();
Works with both embedded and TCP mode. Simple Mode docs →
SQLite handles surprisingly high throughput for single-node deployments:
| Mode | Peak Throughput | Use Case |
|---|---|---|
| Embedded | 286K ops/sec | Same process |
| TCP | 149K ops/sec | Distributed workers |
Run
bun run benchto verify on your hardware. Full benchmark methodology →
Monitoring
# Start with Prometheus + Grafana
docker compose --profile monitoring up -d- Grafana: http://localhost:3000 (admin/bunqueue)
- Prometheus: http://localhost:9090
Documentation
- Quick Start
- Workflow Engine
- MCP Server (AI Agents)
- Simple Mode
- Queue API
- Worker API
- Server Mode
- Benchmarks
- CLI Reference
License
MIT