A PostgreSQL-backed ordered message queue with webhook delivery, exposed via a FastAPI REST API.
Architecture
graph LR
Client -->|POST /topics/.../messages| API[FastAPI API]
SQL[Direct SQL INSERT] -->|trigger| PG[(PostgreSQL)]
API --> PG
PG -->|LISTEN/NOTIFY| Worker[Delivery Worker]
Worker -->|HTTP POST| WH1[Webhook 1]
Worker -->|HTTP POST| WH2[Webhook 2]
WH1 -->|2xx| Worker
WH1 -->|non-2xx| Worker -->|retry / dead-letter| PG
Features
- Topic-based messaging – Messages belong to a topic and a partition.
- Webhook delivery – Register HTTP endpoints; messages are POSTed automatically.
- Strict ordering – Messages within a partition are delivered in order per webhook.
- Retry with backoff – Failed deliveries are retried with exponential backoff.
- Dead-letter partition – After
MAX_RETRIESfailures, messages move to dead-letter for inspection. - Horizontal scaling – Multiple workers can run safely using
FOR UPDATE SKIP LOCKED. - Near real-time – PostgreSQL
LISTEN/NOTIFYtriggers immediate processing. - Direct SQL inserts – Clients can insert messages via SQL; a trigger fans out delivery records and fires NOTIFY.
Quickstart (Docker)
Run everything with a single command:
docker-compose up --build
This starts PostgreSQL, runs migrations, launches the API on http://localhost:8000, and starts the background worker.
Quickstart (Manual)
# 1. Start PostgreSQL only docker-compose up -d db # 2. Install dependencies with uv uv sync # 3. Run migrations uv run alembic upgrade head # 4. (Optional) Install the NOTIFY trigger for direct SQL inserts # Shell into the running container # psql -U postgres psql -f sql/notify_trigger.sql pypgmq # 5. Start the API uv run uvicorn pypgmq.api.main:app --reload # 6. Start the worker (in another terminal) uv run python -m pypgmq.worker.worker
Configuration
Copy .env.example to .env and adjust:
| Variable | Default | Description |
|---|---|---|
DATABASE_URL |
postgresql+asyncpg://postgres:password@localhost:5432/pypgmq |
Async DB URL |
MAX_RETRIES |
5 |
Max delivery attempts before dead-letter |
BACKOFF_FACTOR |
2.0 |
Exponential backoff base (seconds) |
WORKER_CONCURRENCY |
10 |
Max concurrent deliveries per worker batch |
API
| Method | Endpoint | Description |
|---|---|---|
POST |
/topics |
Create a topic |
GET |
/topics |
List topics |
POST |
/webhooks |
Register a webhook for a topic |
GET |
/topics/{id}/webhooks |
List webhooks for a topic |
DELETE |
/webhooks/{id} |
Delete a webhook |
POST |
/topics/{name}/messages |
Send a message |
GET |
/webhooks/{id}/deliveries |
List deliveries for a webhook |
GET |
/dead-letter |
Inspect dead-lettered messages |
GET |
/dead-letter/count |
Count dead-lettered messages |
Interactive docs available at http://localhost:8000/docs.
Direct SQL Usage
Clients can insert messages directly via SQL within a transaction:
BEGIN; -- your application logic … INSERT INTO messages (topic_id, partition_key, payload, created_at) VALUES (1, 'user-42', '{"event": "order.created"}'::jsonb, now()); COMMIT;
The trigger (sql/notify_trigger.sql) creates delivery records for every
subscribed webhook and fires NOTIFY so the worker picks them up immediately.
Testing
# Requires a running PostgreSQL (docker-compose up -d db) uv sync --group test uv run pytest -v
Linting & Formatting
This project uses Ruff for linting and formatting.
uv sync --group dev uv run ruff check pypgmq/ tests/ # lint uv run ruff check --fix pypgmq/ tests/ # lint + auto-fix uv run ruff format pypgmq/ tests/ # format
Project Structure
pypgmq/
├── api/ # FastAPI application
│ └── main.py
├── core/ # Configuration & messaging logic
│ ├── config.py
│ └── messaging.py
├── models/ # SQLAlchemy models & DB session
│ ├── database.py
│ └── models.py
├── schemas/ # Pydantic request/response schemas
│ └── schemas.py
└── worker/ # Background delivery worker
└── worker.py
Additional details
For testing the webhook delivery, we can use a service lik webhook.site.