GitHub - alaminopu/pypgmq: A PostgreSQL-backed ordered message queue with webhook delivery, exposed via a FastAPI REST API

3 min read Original article ↗

A PostgreSQL-backed ordered message queue with webhook delivery, exposed via a FastAPI REST API.

Architecture

graph LR
    Client -->|POST /topics/.../messages| API[FastAPI API]
    SQL[Direct SQL INSERT] -->|trigger| PG[(PostgreSQL)]
    API --> PG
    PG -->|LISTEN/NOTIFY| Worker[Delivery Worker]
    Worker -->|HTTP POST| WH1[Webhook 1]
    Worker -->|HTTP POST| WH2[Webhook 2]
    WH1 -->|2xx| Worker
    WH1 -->|non-2xx| Worker -->|retry / dead-letter| PG
Loading

Features

  • Topic-based messaging – Messages belong to a topic and a partition.
  • Webhook delivery – Register HTTP endpoints; messages are POSTed automatically.
  • Strict ordering – Messages within a partition are delivered in order per webhook.
  • Retry with backoff – Failed deliveries are retried with exponential backoff.
  • Dead-letter partition – After MAX_RETRIES failures, messages move to dead-letter for inspection.
  • Horizontal scaling – Multiple workers can run safely using FOR UPDATE SKIP LOCKED.
  • Near real-time – PostgreSQL LISTEN/NOTIFY triggers immediate processing.
  • Direct SQL inserts – Clients can insert messages via SQL; a trigger fans out delivery records and fires NOTIFY.

Quickstart (Docker)

Run everything with a single command:

docker-compose up --build

This starts PostgreSQL, runs migrations, launches the API on http://localhost:8000, and starts the background worker.

Quickstart (Manual)

# 1. Start PostgreSQL only
docker-compose up -d db

# 2. Install dependencies with uv
uv sync

# 3. Run migrations
uv run alembic upgrade head

# 4. (Optional) Install the NOTIFY trigger for direct SQL inserts
# Shell into the running container
# psql -U postgres
psql -f sql/notify_trigger.sql pypgmq

# 5. Start the API
uv run uvicorn pypgmq.api.main:app --reload

# 6. Start the worker (in another terminal)
uv run python -m pypgmq.worker.worker

Configuration

Copy .env.example to .env and adjust:

Variable Default Description
DATABASE_URL postgresql+asyncpg://postgres:password@localhost:5432/pypgmq Async DB URL
MAX_RETRIES 5 Max delivery attempts before dead-letter
BACKOFF_FACTOR 2.0 Exponential backoff base (seconds)
WORKER_CONCURRENCY 10 Max concurrent deliveries per worker batch

API

Method Endpoint Description
POST /topics Create a topic
GET /topics List topics
POST /webhooks Register a webhook for a topic
GET /topics/{id}/webhooks List webhooks for a topic
DELETE /webhooks/{id} Delete a webhook
POST /topics/{name}/messages Send a message
GET /webhooks/{id}/deliveries List deliveries for a webhook
GET /dead-letter Inspect dead-lettered messages
GET /dead-letter/count Count dead-lettered messages

Interactive docs available at http://localhost:8000/docs.

Direct SQL Usage

Clients can insert messages directly via SQL within a transaction:

BEGIN;
-- your application logic …
INSERT INTO messages (topic_id, partition_key, payload, created_at)
VALUES (1, 'user-42', '{"event": "order.created"}'::jsonb, now());
COMMIT;

The trigger (sql/notify_trigger.sql) creates delivery records for every subscribed webhook and fires NOTIFY so the worker picks them up immediately.

Testing

# Requires a running PostgreSQL (docker-compose up -d db)
uv sync --group test
uv run pytest -v

Linting & Formatting

This project uses Ruff for linting and formatting.

uv sync --group dev

uv run ruff check pypgmq/ tests/        # lint
uv run ruff check --fix pypgmq/ tests/   # lint + auto-fix
uv run ruff format pypgmq/ tests/        # format

Project Structure

pypgmq/
├── api/           # FastAPI application
│   └── main.py
├── core/          # Configuration & messaging logic
│   ├── config.py
│   └── messaging.py
├── models/        # SQLAlchemy models & DB session
│   ├── database.py
│   └── models.py
├── schemas/       # Pydantic request/response schemas
│   └── schemas.py
└── worker/        # Background delivery worker
    └── worker.py

Additional details

For testing the webhook delivery, we can use a service lik webhook.site.