GitHub - shinzui/shibuya

8 min read Original article ↗

Shibuya
A supervised queue processing framework inspired by Broadway


⚠️ Work in Progress

Shibuya is under active development and not yet complete. The API may change significantly before the first stable release.


Shibuya provides a unified abstraction over various message queue backends (Kafka, PostgreSQL queues, SQS, Redis) with built-in supervision, backpressure, and composable stream transformations.

Features

  • Unified Queue Abstraction - Write handlers once, swap queue backends freely
  • Supervised Processing - Failure isolation via NQE supervision
  • Backpressure - Bounded inboxes prevent memory exhaustion
  • Explicit Ack Semantics - Handlers express intent (ack, retry, dead-letter, halt), framework handles mechanics
  • Metrics & Introspection - Real-time visibility into processor state and statistics
  • Stream Transformations - Composable pipelines powered by Streamly
  • Effectful - All effects tracked via the Effectful library

Current Status (v0.5.0.0 — Hackage)

Feature Status
Serial Processing ✅ Implemented
Backpressure (bounded inbox) ✅ Implemented
Ack Semantics (Ok/Retry/DLQ/Halt) ✅ Implemented
Metrics & Introspection ✅ Implemented
NQE Supervision ✅ Implemented
Concurrent Processing (Ahead/Async) ✅ Implemented
OpenTelemetry Tracing ✅ Implemented
Graceful Shutdown (drain timeout) ✅ Implemented
Policy Validation ✅ Implemented

Adapters

Queue backends live in sibling repositories so they can release on their own cadence:

What's New in 0.5.0.0

  • Adapter-supplied span attributesEnvelope gained an attributes :: !(HashMap Text Attribute) field carrying OpenTelemetry attributes for the per-message processing span. Shibuya.Runner.Supervised applies them to its Consumer-kind span after the framework-default messaging.* keys, so broker-aware adapters (Kafka in particular) can emit typed attributes (messaging.kafka.destination.partition, messaging.kafka.message.offset) and override the messaging.system default — without opening a second span.
  • Producer-side trace propagation — new Shibuya.Telemetry.Propagation.currentTraceHeaders :: (Tracing :> es, IOE :> es) => Eff es (Maybe TraceHeaders) looks up the currently-active OTel span and encodes its trace context as W3C headers, ready for an adapter to attach to an outgoing message. Returns Nothing when tracing is disabled or there is no active span. Intended for adapter-side DLQ writes and ad-hoc producer paths.
  • Breaking — direct constructions of Envelope must add the new attributes field; pass Data.HashMap.Strict.empty when the adapter has nothing to contribute (the common case). Envelope's NFData instance is now hand-written rather than derived (because Attribute from hs-opentelemetry-api does not ship NFData); the strictness shape is unchanged.
  • New OpenTelemetry user guide walking through tracer setup, what the framework spans contain, and how to wire propagation end-to-end.
  • shibuya-metrics is re-released at 0.5.0.0 to track the shared version; it has no user-visible changes of its own.

See the CHANGELOG for full release history.

Installation

Available on Hackage. Add to your cabal file:

build-depends:
    shibuya-core ^>=0.5.0.0

Optional packages:

Quick Start

{-# LANGUAGE DeriveGeneric #-}

module Main where

import Shibuya.App
import Shibuya.Telemetry.Effect (runTracingNoop)
import Effectful
import Effectful.Concurrent (runConcurrent)

-- Your domain type
data OrderEvent = OrderEvent
  { orderId :: Text
  , amount  :: Int
  }
  deriving (Generic, FromJSON)

-- Your handler - just return what should happen
handleOrder :: Handler es OrderEvent
handleOrder ingested = do
  let order = ingested.envelope.payload

  result <- liftIO $ processOrder order

  pure $ case result of
    Right ()  -> AckOk                      -- Success
    Left err  -> AckRetry (RetryDelay 30)   -- Retry in 30 seconds

main :: IO ()
main = runEff . runConcurrent . runTracingNoop $ do
  let ordersProcessor = QueueProcessor
        { adapter = myAdapter       -- your adapter of choice
        , handler = handleOrder
        , ordering = Unordered
        , concurrency = Serial
        }

  result <- runApp IgnoreFailures 100
    [ (ProcessorId "orders", ordersProcessor)
    ]

  case result of
    Left err -> liftIO $ print err
    Right appHandle -> waitApp appHandle

Ack Decisions

Handlers return an AckDecision to express intent:

AckOk                              -- Message processed successfully
AckRetry (RetryDelay 30)           -- Retry after 30 seconds
AckDeadLetter (InvalidPayload msg) -- Send to dead-letter queue
AckHalt (HaltFatal reason)         -- Stop processing entirely

Configuration

-- runApp takes:
--   SupervisionStrategy - How to handle processor failures
--   Int                 - Inbox size for backpressure
--   [(ProcessorId, QueueProcessor es)] - Named processors

result <- runApp
  IgnoreFailures   -- Keep running even if a processor fails
  500              -- Inbox buffer size
  [ (ProcessorId "orders", ordersProcessor)
  , (ProcessorId "events", eventsProcessor)
  ]

-- QueueProcessor fields:
--   adapter     - Queue backend (source stream + shutdown)
--   handler     - Your message handler
--   ordering    - Unordered | StrictInOrder
--   concurrency - Serial | Ahead Natural | Async Natural

Exponential Backoff

Shibuya 0.4 ships a built-in exponential-backoff helper for handlers that want exponentially-growing, jittered retry intervals without having to compute the math themselves:

import Shibuya.Core.Retry (defaultBackoffPolicy, retryWithBackoff)

myHandler ingested = do
  result <- tryProcess ingested.envelope.payload
  case result of
    Right ()  -> pure AckOk
    Left _err -> retryWithBackoff defaultBackoffPolicy ingested.envelope

defaultBackoffPolicy is AWS's published "exponential backoff with full jitter" recommendation: 1 s base, factor 2, capped at 5 minutes. The available Jitter strategies are NoJitter, FullJitter (default), and EqualJitter; switch by record-updating the policy (defaultBackoffPolicy { jitter = NoJitter }).

Adapters that track per-message redelivery counts populate ingested.envelope.attempt :: Maybe Attempt; the helper reads it and grows the delay each time the same message returns. The PGMQ adapter sources the counter from pgmq's read_count column. Adapters that do not track redeliveries leave attempt = Nothing, in which case retryWithBackoff treats the delivery as Attempt 0 (base delay).

A runnable end-to-end demonstration lives in the shibuya-pgmq-adapter repo's shibuya-pgmq-example/ package. With a local Postgres reachable via DATABASE_URL, run:

# Terminal 1 — consumer
cabal run shibuya-pgmq-consumer -- backoff-demo nojitter

# Terminal 2 — enqueue one message
cabal run shibuya-pgmq-simulator -- one-shot backoff_demo

The consumer's stdout shows the message being delivered four times, with the wallclock gaps growing 1 s, 2 s, 4 s, then succeeding on the fourth delivery. Drop the nojitter flag for the default full-jittered policy.

Distributed Tracing

Shibuya includes built-in OpenTelemetry tracing support for distributed observability.

Enabling Tracing

import Shibuya.Telemetry.Effect (runTracing, runTracingNoop)
import OpenTelemetry.Trace qualified as OTel

main :: IO ()
main = do
  -- Initialize OpenTelemetry (via SDK or your preferred method)
  provider <- initTracerProvider  -- Your initialization
  let tracer = OTel.makeTracer provider "my-service" OTel.tracerOptions

  -- Run with tracing enabled
  runEff $ runTracing tracer $ do
    result <- runApp IgnoreFailures 100 processors
    -- ...

  -- Or run with tracing disabled (zero overhead)
  runEff $ runTracingNoop $ do
    result <- runApp IgnoreFailures 100 processors
    -- ...

What Gets Traced

Each message creates a span with:

  • Span name: "<destination> process" (e.g. "shibuya-consumer process"), following the OpenTelemetry messaging-spans recommendation
  • Span kind: Consumer
  • Attributes:
    • messaging.system: "shibuya"
    • messaging.operation: "process"
    • messaging.destination.name: The processor id
    • messaging.message.id: The message ID
    • shibuya.partition: Partition (if present)
    • shibuya.inflight.count: Current in-flight messages
    • shibuya.inflight.max: Max concurrency
    • shibuya.ack.decision: Handler's ack decision
  • Events: shibuya.handler.started, shibuya.handler.completed, shibuya.ack.decision (plus the standard exception event on handler exceptions, via recordException)
  • Context propagation: Parent context from traceContext message headers

Local Testing with Jaeger

# Start Jaeger
docker compose -f docker-compose.otel.yaml up -d

# View traces at http://localhost:16686

Environment Variables

Configure tracing via standard OpenTelemetry environment variables:

  • OTEL_SERVICE_NAME - Service name in traces
  • OTEL_EXPORTER_OTLP_ENDPOINT - OTLP collector endpoint
  • OTEL_TRACES_SAMPLER - Sampling strategy (e.g., always_on, parentbased_always_on)

Running Multiple Processors

Run multiple independent queues concurrently with runApp:

main = runEff . runConcurrent . runTracingNoop $ do
  let ordersProcessor = QueueProcessor
        { adapter = ordersAdapter
        , handler = handleOrders
        , ordering = Unordered
        , concurrency = Async 10    -- 10 concurrent handlers
        }
      eventsProcessor = QueueProcessor
        { adapter = eventsAdapter
        , handler = handleEvents
        , ordering = Unordered
        , concurrency = Serial
        }

  result <- runApp IgnoreFailures 100
    [ (ProcessorId "orders", ordersProcessor)
    , (ProcessorId "events", eventsProcessor)
    ]

  case result of
    Left err -> print err
    Right appHandle -> do
      -- Monitor metrics
      metrics <- getAppMetrics appHandle
      forM_ (Map.toList metrics) $ \(ProcessorId name, pm) ->
        putStrLn $ name <> ": " <> show pm.stats.processed <> " processed"

      -- Wait for completion or use stopApp/stopAppGracefully to shut down
      waitApp appHandle

Documentation

Adapter-specific docs (PGMQ, Kafka, ...) live with their respective adapters — see the Adapters section above.

Design Principles

  1. Separation of Concerns - Streamly handles I/O and backpressure, NQE handles supervision
  2. Explicit Semantics - Handlers express intent, not mechanics
  3. Adapter Abstraction - Queue-specific logic lives in adapters, not the core
  4. Composable - Stream pipelines are composable and testable in isolation
  5. Effectful - All effects tracked for testability and safety

References

License

MIT