Postgres Queues: High Throughput with FOR UPDATE SKIP LOCKED
The Allure and Peril of Database-Backed Queues
For many systems, the operational overhead of managing a dedicated message broker like RabbitMQ or SQS is an unnecessary complexity. The siren song of using your primary transactional database (often PostgreSQL) as a job queue is strong: atomic operations are guaranteed by ACID compliance, jobs can be transactionally enqueued with business logic, and there's one less piece of infrastructure to monitor and maintain.
However, most naive implementations crumble under concurrent load. A simple jobs table with a status column quickly becomes a hotbed of lock contention, turning your highly parallelized worker fleet into a de-facto serial processor. Senior engineers know this pain: you scale up your workers from 2 to 20, only to see throughput flatline and CPU usage on the database spike due to lock waits.
This article dissects the advanced PostgreSQL pattern that solves this problem elegantly: FOR UPDATE SKIP LOCKED. We will explore its mechanics, build a production-grade worker, design the necessary high-performance indexes, and handle the complex edge cases that arise in real-world distributed systems.
Anti-Pattern 1: The Race Condition Queue
A junior developer's first attempt often looks like this:
// DO NOT USE THIS IN PRODUCTION
async function getNextJob() {
// 1. Find a pending job
const { rows } = await client.query(
`SELECT id, payload FROM jobs WHERE status = 'pending' LIMIT 1`
);
if (rows.length === 0) return null;
const job = rows[0];
// 2. Mark it as processing
await client.query(
`UPDATE jobs SET status = 'processing' WHERE id = $1`,
[job.id]
);
return job;
}
Under any concurrent load (>1 worker), this is fundamentally broken. Two workers can execute the SELECT query at nearly the same time, both retrieve the same job, and both proceed to process it. You've just introduced duplicate processing, a potentially catastrophic bug.
Anti-Pattern 2: The Contention-Heavy `FOR UPDATE` Queue
A more experienced developer might recognize the race condition and reach for row-level locking with SELECT ... FOR UPDATE.
-- Worker 1
BEGIN;
SELECT id, payload FROM jobs WHERE status = 'pending' ORDER BY created_at LIMIT 1 FOR UPDATE;
-- This query now blocks Worker 2 until Worker 1's transaction ends.
-- Worker 2 (at the same time)
BEGIN;
SELECT id, payload FROM jobs WHERE status = 'pending' ORDER BY created_at LIMIT 1 FOR UPDATE;
-- BLOCKS! Waits for Worker 1 to COMMIT or ROLLBACK.
This solves the duplicate processing problem—only one worker can acquire the lock on a given row. However, it introduces a massive performance bottleneck. Your workers now form a queue, waiting for the single hot row at the head of the line. Your throughput is limited by the processing time of a single worker. Scaling horizontally yields zero benefit. You've effectively serialized your entire worker pool.
The Solution: `FOR UPDATE SKIP LOCKED`
Enter SKIP LOCKED, available since PostgreSQL 9.5. It's a modifier for FOR UPDATE or FOR SHARE that instructs PostgreSQL: "Attempt to acquire a lock on this row, but if you can't because another transaction already holds it, don't wait. Just skip this row entirely and pretend it didn't exist in the query results."
This is a game-changer for queue implementations. Multiple workers can now query the jobs table simultaneously. Each worker will attempt to lock the first available job. If Worker 1 locks job_id=1, Worker 2's query will see that job_id=1 is locked, skip it, and immediately acquire a lock on the next available job, job_id=2. No waiting, no blocking. True parallel dequeueing.
Here is the canonical, production-ready dequeue query:
-- The core of a high-throughput queue
BEGIN;
-- Atomically find and lock the next available job
SELECT id, payload, retry_count
FROM jobs
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1;
-- If a job is found, the application logic processes it.
-- After processing, update its status within the same transaction.
UPDATE jobs
SET status = 'completed', completed_at = NOW()
WHERE id = <retrieved_job_id>;
COMMIT;
This pattern provides:
SKIP LOCKED ensures workers don't block each other, allowing for true horizontal scaling.ORDER BY clause ensures high-priority jobs are always picked first, without causing contention.Production Implementation: A Robust Node.js Worker
Let's build a worker that uses this pattern and handles real-world complexities. First, our table schema:
CREATE TYPE job_status AS ENUM ('pending', 'processing', 'completed', 'failed');
CREATE TABLE jobs (
id BIGSERIAL PRIMARY KEY,
queue_name TEXT NOT NULL DEFAULT 'default',
payload JSONB NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
status job_status NOT NULL DEFAULT 'pending',
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
process_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ
);
-- We will discuss this index in detail later. It's CRITICAL.
CREATE INDEX idx_jobs_fetch_pending ON jobs (priority DESC, created_at ASC)
WHERE status = 'pending' AND process_at <= NOW();
Now, the Node.js worker implementation using the pg library.
const { Pool } = require('pg');
const pool = new Pool({ /* connection details */ });
// A utility to simulate work
const doWork = (payload) => new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
async function processJob(job) {
console.log(`[Worker ${process.pid}] Processing job ${job.id} with payload:`, job.payload);
try {
// Simulate business logic. This could be anything:
// sending an email, processing an image, calling a third-party API, etc.
await doWork(job.payload);
if (Math.random() < 0.1) { // Simulate a transient failure
throw new Error('A transient error occurred!');
}
console.log(`[Worker ${process.pid}] Successfully processed job ${job.id}`);
return { success: true };
} catch (error) {
console.error(`[Worker ${process.pid}] Error processing job ${job.id}:`, error.message);
return { success: false, error: error.message };
}
}
async function workerLoop() {
while (true) {
const client = await pool.connect();
try {
await client.query('BEGIN');
const dequeueQuery = `
SELECT id, payload, retry_count, max_retries
FROM jobs
WHERE status = 'pending' AND process_at <= NOW()
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1;
`;
const { rows } = await client.query(dequeueQuery);
const job = rows[0];
if (job) {
// We have a job. Mark it as 'processing' immediately.
// This helps with observability and detecting 'zombie' jobs.
await client.query(
`UPDATE jobs SET status = 'processing', started_at = NOW() WHERE id = $1`,
[job.id]
);
// Commit this small transaction to release the lock on the table for other workers
// while this worker processes the job. This is an advanced optimization.
await client.query('COMMIT');
// Now process the job outside the main lock transaction
const result = await processJob(job);
// Start a new transaction to finalize the job state
await client.query('BEGIN');
if (result.success) {
await client.query(
`UPDATE jobs SET status = 'completed', completed_at = NOW() WHERE id = $1`,
[job.id]
);
} else {
// Failure handling with exponential backoff
const newRetryCount = job.retry_count + 1;
if (newRetryCount >= job.max_retries) {
await client.query(
`UPDATE jobs SET status = 'failed', last_error = $1 WHERE id = $2`,
[result.error, job.id]
);
} else {
const delaySeconds = Math.pow(2, newRetryCount) * 5; // 10s, 20s, 40s...
await client.query(
`UPDATE jobs SET status = 'pending', retry_count = $1, last_error = $2, process_at = NOW() + interval '${delaySeconds} seconds' WHERE id = $3`,
[newRetryCount, result.error, job.id]
);
}
}
}
await client.query('COMMIT');
} catch (e) {
console.error('CRITICAL: Transaction failed, rolling back.', e);
await client.query('ROLLBACK');
} finally {
client.release();
}
// If no job was found, wait a bit before polling again.
if (!job) {
await new Promise(resolve => setTimeout(resolve, 2000));
}
}
}
// Start multiple workers
const NUM_WORKERS = 4;
for (let i = 0; i < NUM_WORKERS; i++) {
workerLoop();
}
console.log(`Started ${NUM_WORKERS} workers.`);
This implementation introduces a two-phase commit strategy. The initial SELECT ... FOR UPDATE SKIP LOCKED is in a very short transaction that simply marks the job as processing. This releases the row lock and any potential table-level locks quickly, allowing other workers to dequeue jobs without waiting for the current job to finish processing. The actual work happens outside the lock-acquiring transaction, and a final, separate transaction updates the job to its terminal state (completed or failed). This minimizes lock duration, which is critical for high-throughput systems.
Performance Tuning: The Magic of Partial Indexes
A jobs table can grow to hundreds of millions of rows. Querying it efficiently is non-negotiable. A standard index on (status) is nearly useless because of its low cardinality; the database would still need to scan a massive portion of the index.
The key is a partial index (or filtered index).
-- The index from our schema earlier
CREATE INDEX idx_jobs_fetch_pending ON jobs (priority DESC, created_at ASC)
WHERE status = 'pending' AND process_at <= NOW();
Why this is so powerful:
pending and ready to be processed. If you have 100 million completed jobs and only 10,000 pending, this index will be thousands of times smaller than a full index.WHERE clause of the query perfectly matches the WHERE clause of the index. It can go directly to this tiny, specialized index to find candidate rows.Let's prove it with EXPLAIN ANALYZE.
Scenario: A table with 10 million jobs, 5,000 of which are pending.
Without the partial index (using a generic index on (status)):
EXPLAIN ANALYZE SELECT id FROM jobs WHERE status = 'pending' AND process_at <= NOW() LIMIT 1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Limit (cost=0.57..6.98 rows=1 width=8) (actual time=150.456..150.457 rows=1 loops=1)
-> Index Scan using idx_jobs_status on jobs (cost=0.57..31998.11 rows=5000 width=8)
Index Cond: (status = 'pending')
Filter: (process_at <= now())
Planning Time: 0.15ms
Execution Time: 150.512ms
The planner has to scan all 5,000 pending jobs in the index and then filter them by process_at. The execution time is poor.
With the partial index:
EXPLAIN ANALYZE SELECT id FROM jobs WHERE status = 'pending' AND process_at <= NOW() LIMIT 1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------
Limit (cost=0.42..0.46 rows=1 width=8) (actual time=0.035..0.036 rows=1 loops=1)
-> Index Scan using idx_jobs_fetch_pending on jobs (cost=0.42..20.42 rows=5000 width=8)
Planning Time: 0.21ms
Execution Time: 0.055ms
The execution time is over 2700x faster. The planner uses our highly-specific index and finds a candidate row almost instantly. This is the single most important optimization for a database-backed queue.
Advanced Edge Cases and Production Considerations
1. Zombie Workers and Job Timeouts
What happens if a worker acquires a job, marks it as processing, and then crashes irrecoverably? The job is now a "zombie," stuck in the processing state forever.
Solution: A separate janitor process or scheduled job that cleans up stale jobs.
-- This query finds jobs that have been 'processing' for too long (e.g., 1 hour)
-- and resets them to 'pending' to be picked up again.
UPDATE jobs
SET
status = 'pending',
started_at = NULL,
retry_count = retry_count + 1, -- Penalize the job for timing out
last_error = 'Job timed out after 1 hour'
WHERE
status = 'processing'
AND started_at < NOW() - INTERVAL '1 hour'
AND retry_count < max_retries;
-- Also, handle jobs that timed out and have no retries left
UPDATE jobs
SET
status = 'failed',
last_error = 'Job timed out and exceeded max retries'
WHERE
status = 'processing'
AND started_at < NOW() - INTERVAL '1 hour'
AND retry_count >= max_retries;
This cleanup process is crucial for the long-term health of the queue.
2. Table Bloat and VACUUM
A jobs table experiences an extremely high rate of UPDATEs. Each UPDATE in PostgreSQL effectively creates a new version of the row (a new tuple) and marks the old one as dead. Over time, this leads to table bloat, where the physical file on disk is much larger than the actual data it contains, harming query performance.
Solution:
* Aggressive Autovacuum: Tune autovacuum parameters specifically for the jobs table to run more frequently. You can set storage parameters directly on the table:
ALTER TABLE jobs SET (autovacuum_vacuum_scale_factor = 0.05, autovacuum_analyze_scale_factor = 0.02);
* Partitioning: For very high-volume systems, partition the jobs table, perhaps by created_at date. This allows you to DROP old partitions of completed jobs entirely, which is an instantaneous, metadata-only operation, instead of running a costly DELETE.
3. Starvation
The ORDER BY priority DESC, created_at ASC clause is efficient, but it can lead to starvation. If there is a constant stream of high-priority jobs, low-priority jobs might never be processed.
Solution: This is a business logic problem more than a technical one. Solutions might include:
* Periodically increasing the priority of old, unprocessed jobs.
* Having separate worker pools for different priority levels.
* Implementing a query that occasionally fetches a lower-priority job to ensure fairness, e.g., in 1% of dequeue attempts.
4. Multi-tenancy
In a SaaS environment, you need to ensure one tenant's jobs don't monopolize the queue. The pattern extends easily:
* Add a tenant_id column to the jobs table.
Include tenant_id as the first* key in your partial index:
CREATE INDEX idx_jobs_multi_tenant_fetch ON jobs (tenant_id, priority DESC, created_at ASC)
WHERE status = 'pending' AND process_at <= NOW();
* Your dequeue logic now needs to be tenant-aware, possibly iterating through tenants to ensure fair processing.
When Not to Use This Pattern
Despite its power, a Postgres queue isn't a silver bullet. You should reach for a dedicated message broker like RabbitMQ, Kafka, or SQS when:
* Extreme Throughput: You need to process hundreds of thousands of jobs per second. While Postgres is fast, it's not designed to compete with systems built specifically for message brokering at that scale.
* Complex Routing: You require fan-out, topic exchanges, or other complex message routing patterns.
* Inter-Service Decoupling: Your architecture involves many microservices written in different languages that need a common, language-agnostic bus for communication.
* Push-based Delivery: You need the broker to push messages to consumers, rather than consumers polling for work.
Conclusion
By leveraging the FOR UPDATE SKIP LOCKED clause, you can transform PostgreSQL from a simple data store into a powerful, reliable, and highly concurrent job queue. This pattern eliminates lock contention, enables true horizontal scaling of workers, and maintains the transactional integrity you rely on from your primary database.
Remember that the implementation details matter. A naive approach will fail, but by combining SKIP LOCKED with a carefully crafted partial index, robust error handling, and a strategy for handling zombie workers, you can build a system that is both performant and resilient. For a vast number of applications, this approach provides the perfect balance of power and operational simplicity, allowing you to build sophisticated background processing systems without leaving the comfort of your relational database.