
A supervised queue processing framework inspired by Broadway
⚠️ Work in ProgressShibuya is under active development and not yet complete. The API may change significantly before the first stable release.
Shibuya provides a unified abstraction over various message queue backends (Kafka, PostgreSQL queues, SQS, Redis) with built-in supervision, backpressure, and composable stream transformations.
Features
- Unified Queue Abstraction - Write handlers once, swap queue backends freely
- Supervised Processing - Failure isolation via NQE supervision
- Backpressure - Bounded inboxes prevent memory exhaustion
- Explicit Ack Semantics - Handlers express intent (ack, retry, dead-letter, halt), framework handles mechanics
- Metrics & Introspection - Real-time visibility into processor state and statistics
- Stream Transformations - Composable pipelines powered by Streamly
- Effectful - All effects tracked via the Effectful library
Current Status (v0.5.0.0 — Hackage)
| Feature | Status |
|---|---|
| Serial Processing | ✅ Implemented |
| Backpressure (bounded inbox) | ✅ Implemented |
| Ack Semantics (Ok/Retry/DLQ/Halt) | ✅ Implemented |
| Metrics & Introspection | ✅ Implemented |
| NQE Supervision | ✅ Implemented |
| Concurrent Processing (Ahead/Async) | ✅ Implemented |
| OpenTelemetry Tracing | ✅ Implemented |
| Graceful Shutdown (drain timeout) | ✅ Implemented |
| Policy Validation | ✅ Implemented |
Adapters
Queue backends live in sibling repositories so they can release on their own cadence:
shibuya-kafka-adapter— Apache Kafka viahw-kafka-clientandkafka-effectful.shibuya-pgmq-adapter— PostgreSQL message queue (pgmq) viapgmq-hs.
What's New in 0.5.0.0
- Adapter-supplied span attributes —
Envelopegained anattributes :: !(HashMap Text Attribute)field carrying OpenTelemetry attributes for the per-message processing span.Shibuya.Runner.Supervisedapplies them to its Consumer-kind span after the framework-defaultmessaging.*keys, so broker-aware adapters (Kafka in particular) can emit typed attributes (messaging.kafka.destination.partition,messaging.kafka.message.offset) and override themessaging.systemdefault — without opening a second span. - Producer-side trace propagation — new
Shibuya.Telemetry.Propagation.currentTraceHeaders :: (Tracing :> es, IOE :> es) => Eff es (Maybe TraceHeaders)looks up the currently-active OTel span and encodes its trace context as W3C headers, ready for an adapter to attach to an outgoing message. ReturnsNothingwhen tracing is disabled or there is no active span. Intended for adapter-side DLQ writes and ad-hoc producer paths. - Breaking — direct constructions of
Envelopemust add the newattributesfield; passData.HashMap.Strict.emptywhen the adapter has nothing to contribute (the common case).Envelope'sNFDatainstance is now hand-written rather than derived (becauseAttributefromhs-opentelemetry-apidoes not shipNFData); the strictness shape is unchanged. - New OpenTelemetry user guide walking through tracer setup, what the framework spans contain, and how to wire propagation end-to-end.
shibuya-metricsis re-released at 0.5.0.0 to track the shared version; it has no user-visible changes of its own.
See the CHANGELOG for full release history.
Installation
Available on Hackage. Add to your cabal file:
build-depends: shibuya-core ^>=0.5.0.0
Optional packages:
shibuya-metrics— HTTP/JSON, Prometheus, and WebSocket metrics endpointsshibuya-pgmq-adapter— PostgreSQL message queue adapter (standalone repo)shibuya-kafka-adapter— Apache Kafka adapter (standalone repo)
Quick Start
{-# LANGUAGE DeriveGeneric #-}
module Main where
import Shibuya.App
import Shibuya.Telemetry.Effect (runTracingNoop)
import Effectful
import Effectful.Concurrent (runConcurrent)
-- Your domain type
data OrderEvent = OrderEvent
{ orderId :: Text
, amount :: Int
}
deriving (Generic, FromJSON)
-- Your handler - just return what should happen
handleOrder :: Handler es OrderEvent
handleOrder ingested = do
let order = ingested.envelope.payload
result <- liftIO $ processOrder order
pure $ case result of
Right () -> AckOk -- Success
Left err -> AckRetry (RetryDelay 30) -- Retry in 30 seconds
main :: IO ()
main = runEff . runConcurrent . runTracingNoop $ do
let ordersProcessor = QueueProcessor
{ adapter = myAdapter -- your adapter of choice
, handler = handleOrder
, ordering = Unordered
, concurrency = Serial
}
result <- runApp IgnoreFailures 100
[ (ProcessorId "orders", ordersProcessor)
]
case result of
Left err -> liftIO $ print err
Right appHandle -> waitApp appHandleAck Decisions
Handlers return an AckDecision to express intent:
AckOk -- Message processed successfully AckRetry (RetryDelay 30) -- Retry after 30 seconds AckDeadLetter (InvalidPayload msg) -- Send to dead-letter queue AckHalt (HaltFatal reason) -- Stop processing entirely
Configuration
-- runApp takes: -- SupervisionStrategy - How to handle processor failures -- Int - Inbox size for backpressure -- [(ProcessorId, QueueProcessor es)] - Named processors result <- runApp IgnoreFailures -- Keep running even if a processor fails 500 -- Inbox buffer size [ (ProcessorId "orders", ordersProcessor) , (ProcessorId "events", eventsProcessor) ] -- QueueProcessor fields: -- adapter - Queue backend (source stream + shutdown) -- handler - Your message handler -- ordering - Unordered | StrictInOrder -- concurrency - Serial | Ahead Natural | Async Natural
Exponential Backoff
Shibuya 0.4 ships a built-in exponential-backoff helper for handlers that want exponentially-growing, jittered retry intervals without having to compute the math themselves:
import Shibuya.Core.Retry (defaultBackoffPolicy, retryWithBackoff) myHandler ingested = do result <- tryProcess ingested.envelope.payload case result of Right () -> pure AckOk Left _err -> retryWithBackoff defaultBackoffPolicy ingested.envelope
defaultBackoffPolicy is AWS's published "exponential backoff with
full jitter" recommendation: 1 s base, factor 2, capped at 5 minutes.
The available Jitter strategies are NoJitter, FullJitter
(default), and EqualJitter; switch by record-updating the policy
(defaultBackoffPolicy { jitter = NoJitter }).
Adapters that track per-message redelivery counts populate
ingested.envelope.attempt :: Maybe Attempt; the helper reads it and
grows the delay each time the same message returns. The PGMQ adapter
sources the counter from pgmq's read_count column. Adapters that do
not track redeliveries leave attempt = Nothing, in which case
retryWithBackoff treats the delivery as Attempt 0 (base delay).
A runnable end-to-end demonstration lives in the
shibuya-pgmq-adapter
repo's shibuya-pgmq-example/ package. With a local Postgres
reachable via DATABASE_URL, run:
# Terminal 1 — consumer cabal run shibuya-pgmq-consumer -- backoff-demo nojitter # Terminal 2 — enqueue one message cabal run shibuya-pgmq-simulator -- one-shot backoff_demo
The consumer's stdout shows the message being delivered four times,
with the wallclock gaps growing 1 s, 2 s, 4 s, then succeeding on the
fourth delivery. Drop the nojitter flag for the default
full-jittered policy.
Distributed Tracing
Shibuya includes built-in OpenTelemetry tracing support for distributed observability.
Enabling Tracing
import Shibuya.Telemetry.Effect (runTracing, runTracingNoop) import OpenTelemetry.Trace qualified as OTel main :: IO () main = do -- Initialize OpenTelemetry (via SDK or your preferred method) provider <- initTracerProvider -- Your initialization let tracer = OTel.makeTracer provider "my-service" OTel.tracerOptions -- Run with tracing enabled runEff $ runTracing tracer $ do result <- runApp IgnoreFailures 100 processors -- ... -- Or run with tracing disabled (zero overhead) runEff $ runTracingNoop $ do result <- runApp IgnoreFailures 100 processors -- ...
What Gets Traced
Each message creates a span with:
- Span name:
"<destination> process"(e.g."shibuya-consumer process"), following the OpenTelemetry messaging-spans recommendation - Span kind:
Consumer - Attributes:
messaging.system: "shibuya"messaging.operation: "process"messaging.destination.name: The processor idmessaging.message.id: The message IDshibuya.partition: Partition (if present)shibuya.inflight.count: Current in-flight messagesshibuya.inflight.max: Max concurrencyshibuya.ack.decision: Handler's ack decision
- Events:
shibuya.handler.started,shibuya.handler.completed,shibuya.ack.decision(plus the standardexceptionevent on handler exceptions, viarecordException) - Context propagation: Parent context from
traceContextmessage headers
Local Testing with Jaeger
# Start Jaeger docker compose -f docker-compose.otel.yaml up -d # View traces at http://localhost:16686
Environment Variables
Configure tracing via standard OpenTelemetry environment variables:
OTEL_SERVICE_NAME- Service name in tracesOTEL_EXPORTER_OTLP_ENDPOINT- OTLP collector endpointOTEL_TRACES_SAMPLER- Sampling strategy (e.g.,always_on,parentbased_always_on)
Running Multiple Processors
Run multiple independent queues concurrently with runApp:
main = runEff . runConcurrent . runTracingNoop $ do let ordersProcessor = QueueProcessor { adapter = ordersAdapter , handler = handleOrders , ordering = Unordered , concurrency = Async 10 -- 10 concurrent handlers } eventsProcessor = QueueProcessor { adapter = eventsAdapter , handler = handleEvents , ordering = Unordered , concurrency = Serial } result <- runApp IgnoreFailures 100 [ (ProcessorId "orders", ordersProcessor) , (ProcessorId "events", eventsProcessor) ] case result of Left err -> print err Right appHandle -> do -- Monitor metrics metrics <- getAppMetrics appHandle forM_ (Map.toList metrics) $ \(ProcessorId name, pm) -> putStrLn $ name <> ": " <> show pm.stats.processed <> " processed" -- Wait for completion or use stopApp/stopAppGracefully to shut down waitApp appHandle
Documentation
- Usage Guide - Detailed usage examples
- Getting Started - Framework walkthrough
- Architecture - System design and module structure
- Architecture Details - Core types, message flow, metrics, concurrency
- CHANGELOG - Release history
Adapter-specific docs (PGMQ, Kafka, ...) live with their respective adapters — see the Adapters section above.
Design Principles
- Separation of Concerns - Streamly handles I/O and backpressure, NQE handles supervision
- Explicit Semantics - Handlers express intent, not mechanics
- Adapter Abstraction - Queue-specific logic lives in adapters, not the core
- Composable - Stream pipelines are composable and testable in isolation
- Effectful - All effects tracked for testability and safety
References
- Broadway (Elixir) - Primary inspiration
- Streamly - Stream processing
- Effectful - Effect system
- NQE - Actor supervision
License
MIT