I removed Redis from our job system and moved the queue into Postgres. I’ll show the schema, the exact queries, the failure modes, and the operational playbook.
Why Postgres for jobs
One less moving piece reduces incident surface.
Transactions let producers atomically write app state and enqueue work.
Backups and HA piggyback on the database you already run.
FOR UPDATE SKIP LOCKED gives cheap, fair-ish work stealing without central coordinators.
You still get at-least-once delivery; exactly-once is marketing.
The data model
Use a single table with partial indexes; avoid status fan-out tables until you need partitioning.
-- status kept narrow for cheap comparisons CREATE TYPE job_status AS ENUM ('queued','running','succeeded','failed','dead'); CREATE TABLE jobs ( id bigserial PRIMARY KEY, queue text NOT NULL, -- e.g. "emails", "reports" priority int NOT NULL DEFAULT 0, -- larger = earlier run_at timestamptz NOT NULL DEFAULT now(), -- schedule / backoff created_at timestamptz NOT NULL DEFAULT now(), started_at timestamptz, finished_at timestamptz, lease_until timestamptz, -- worker heartbeat / lease attempts int NOT NULL DEFAULT 0, max_attempts int NOT NULL DEFAULT 20, unique_key text, -- idempotency key worker_id text, last_error text, payload jsonb NOT NULL, status job_status NOT NULL DEFAULT 'queued' ); -- Fast pickup of ready jobs CREATE INDEX jobs_ready_idx ON jobs (queue, priority DESC, run_at, id) WHERE status='queued'; -- Prevent duplicate in-flight work for idempotent tasks CREATE UNIQUE INDEX jobs_unique_inflight ON jobs(unique_key) WHERE status IN ('queued','running');
Enqueue with idempotency
Producers should be boring and transactional.
-- inside the same txn as your domain write INSERT INTO jobs (queue, priority, run_at, unique_key, payload) VALUES ($1, $2, coalesce($3, now()), $4, $5) ON CONFLICT ON CONSTRAINT jobs_unique_inflight DO NOTHING RETURNING id;
If you need “coalesce or bump schedule,” upsert into run_at with GREATEST(existing.run_at, new.run_at).
Claiming work with SKIP LOCKED
Claim, mark running, and release locks quickly; do the heavy work outside the claim transaction.
-- claim N jobs fairly by priority then time WITH cte AS ( SELECT id FROM jobs WHERE queue = $1 AND status='queued' AND run_at <= now() ORDER BY priority DESC, run_at, id FOR UPDATE SKIP LOCKED LIMIT $2 ) UPDATE jobs j SET status='running', started_at = now(), lease_until = now() + interval '30 seconds', worker_id = $3 FROM cte WHERE j.id = cte.id RETURNING j.*;
SKIP LOCKED prevents thundering herds and minimizes cross-worker contention.
Keep the claim transaction short to avoid vacuum lag.
Heartbeats and leases
Workers must extend leases while executing to recover from crashes without global coordination.
UPDATE jobs SET lease_until = now() + interval '30 seconds' WHERE id = ANY($1) AND worker_id=$2 AND status='running';
A reaper moves timed-out jobs back to queued:
UPDATE jobs SET status='queued', worker_id=NULL, lease_until=NULL WHERE status='running' AND lease_until < now() RETURNING id;
Success, retry, and dead-letter
Delete on success if you don’t need audit history; update if you do.
-- ack DELETE FROM jobs WHERE id=$1 AND worker_id=$2; -- retry with exponential backoff and capped growth UPDATE jobs SET attempts=attempts+1, status='queued', run_at = now() + (power(2, LEAST(attempts,12)) || ' seconds')::interval, last_error=$3, worker_id=NULL, lease_until=NULL WHERE id=$1; -- dead-letter when exhausted UPDATE jobs SET status='dead', finished_at=now() WHERE id=$1 AND attempts >= max_attempts;
Idempotent handlers make retries safe; use unique_key to collapse duplicates.
Avoid side effects without idempotency keys at integration boundaries.
Per-entity serialization with advisory locks
Stop concurrent work on the same tenant or resource without heavyweight locks.
-- take per-entity lock for the duration of the handler SELECT pg_try_advisory_xact_lock( hashtext($1::text || ':' || coalesce($2,'')) -- queue + unique_key ) AS taken;
If taken=false, requeue with a short delay to avoid live-lock.
Worker structure (Go)
Tight loop, explicit batch size, bounded concurrency, and periodic heartbeats.
type Job struct { ID int64; Payload json.RawMessage } func claim(db *sql.DB, queue string, n int, worker string) ([]Job, error) { /* run CTE+UPDATE */ } func heartbeat(db *sql.DB, ids []int64, worker string) error { /* lease extension */ } func ack(db *sql.DB, id int64, worker string) error { /* DELETE */ } func retry(db *sql.DB, id int64, worker string, err string) error { /* UPDATE backoff */ } func worker(ctx context.Context, db *sql.DB, queue, worker string, parallel int, batch int) error { sem := make(chan struct{}, parallel) hb := time.NewTicker(10 * time.Second) defer hb.Stop() var inFlight []int64 for ctx.Err()==nil { jobs, _ := claim(db, queue, batch, worker) if len(jobs)==0 { time.Sleep(50 * time.Millisecond); continue } for _, j := range jobs { sem <- struct{}{} inFlight = append(inFlight, j.ID) go func(job Job) { defer func(){ <-sem }() if err := handle(job); err != nil { _ = retry(db, job.ID, worker, err.Error()); return } _ = ack(db, job.ID, worker) }(j) } select { case <-hb.C: _ = heartbeat(db, inFlight, worker) inFlight = compactAlive(inFlight) // drop acks default: } } return ctx.Err() }
Keep handler functions pure and side-effect idempotent; isolate IO per integration.
Scheduling recurring jobs
Use the same table; insert future instances at execution time to avoid fan-out storms.
-- "enqueue next run" pattern at the end of a successful invocation INSERT INTO jobs(queue, run_at, unique_key, payload) VALUES ('reports', now() + interval '5 minutes', 'reports:tenant:123', '{"period":"5m"}') ON CONFLICT DO NOTHING;
Avoid a global ticker that inserts thousands at once; distribute by unique_key hash.
Observability with pure SQL
Track backlog, aging, and failure spikes without a new metrics stack.
-- backlog by queue SELECT queue, count(*) FILTER (WHERE status='queued') AS ready, count(*) FILTER (WHERE status='running') AS running FROM jobs GROUP BY queue ORDER BY ready DESC; -- long-runners SELECT id, queue, now()-started_at AS runtime, attempts, worker_id FROM jobs WHERE status='running' AND lease_until > now() ORDER BY runtime DESC LIMIT 50; -- aging ready work SELECT queue, percentile_disc(0.99) WITHIN GROUP (ORDER BY now()-run_at) AS p99_wait FROM jobs WHERE status='queued' GROUP BY queue;
Export these as gauges; page on p99_wait growth, not raw counts.
Throughput envelope
Throughput ≈ workers * (batch_size / (claim_rtt + avg_handle_time)).
Latency floor ≈ claim_period + contention_cost + handler_time.
Lower claim_rtt by keeping the claim UPDATE tiny and indexed.
Right-size batch_size to fill CPU without starving fairness.
Use connection pools sized for (workers + reaper + producers); stop before max_connections punishes you.
Bloat and autovacuum realities
Frequent deletes create dead tuples; Postgres cleans them eventually, not instantly.
Use DELETE on success for minimal write amplification; avoid “update status then archive” churn.
Partition by month once the table is “hot”; detach old partitions instead of mass-deleting.
Add fillfactor=90 if you must update many rows in-place; HOT updates reduce index churn.
Keep vacuum_cost_limit and autovacuum_vacuum_cost_limit sane; starved autovacuum is self-harm.
Fairness and priorities
Sorting by (priority DESC, run_at, id) prevents queue starvation without per-queue shards.
If you need strict per-tenant fairness, add tenant_id and claim in round-robin by tenant with a small LIMIT 1 per tenant in a loop.
Don’t over-engineer until p95 wait time proves bias.
When not to do this
- You need fan-out streams or replayable history -> use Kafka, NATS, or Redpanda.
- You need >100k jobs/sec sustained writes from many producers -> use a real queue.
- You require multi-datacenter active-active with independent ordering guarantees -> this pattern won’t save you.
- You don’t control the database or your team lacks SQL/ops depth -> sovereignty beats cleverness.
Cutover plan that won’t page you
Dual-write jobs to Redis and Postgres for a week and drain with both consumers. Shadow-consume from Postgres and compare ack counts and handler outcomes. Flip producers to Postgres-only behind a feature flag; keep the Redis consumer idle but running. Delete the Redis stream after a full retention window; remove the dependency last.
Long claim transactions block vacuum; keep them <10 ms.
A missing partial index turned ORDER BY into a heap scan; verify with EXPLAIN (ANALYZE, BUFFERS).
Advisory locks without *_xact_* leaked across code paths; always prefer transaction-scoped locks.
Retries without jitter synchronized storms; add random() * interval '1s' to backoff.
Lease durations shorter than handler cold-start times caused flapping; measure startup and add 2× margin.
Minimal reproducible benchmark harness
Drive the system with a single SQL file and one Go binary; avoid synthetic hero numbers.
-- seed 100k jobs INSERT INTO jobs(queue, priority, payload) SELECT 'bench', (random()*10)::int, jsonb_build_object('n', g) FROM generate_series(1,100000) g;
Run workers at parallelism 32, batch 50, handler sleep 2–5 ms random.
Watch p99_wait and % of reaped leases; throughput follows the formula, not vibes.
Closing stance
If your workload fits the envelope and you respect the vacuum, it’s fewer servers, fewer dashboards, and fewer nights on-call. If it doesn’t, use the right tool and don’t argue with tail latencies.