High-Throughput Job Queues with Postgres `FOR UPDATE SKIP LOCKED`
The Senior Engineer's Dilemma: The In-Database Job Queue
As systems scale, the need for asynchronous background processing becomes non-negotiable. The default path for many is to reach for a dedicated message queueing system like RabbitMQ, SQS, or Kafka. While powerful, these introduce significant operational overhead: new infrastructure to manage, monitor, and secure, along with another point of failure. For many use cases, especially those where jobs are tightly coupled with primary application data, a robust job queue can be built directly within PostgreSQL.
The challenge, however, is concurrency. A naive implementation quickly falls apart under the pressure of multiple competing workers, leading to race conditions, deadlocks, and severe lock contention. This article is a deep dive into the definitive pattern for solving this problem: leveraging PostgreSQL's FOR UPDATE SKIP LOCKED
clause to build a highly concurrent, scalable, and resilient job queue that avoids the common pitfalls.
We will not cover the basics of SELECT
or UPDATE
. We assume you understand database transactions, row-level locking, and the fundamental problems of concurrent data access. Our focus is on the production-grade implementation details that separate a toy queue from a system that can reliably process millions of jobs per day.
The Failure of Naive Approaches
Before dissecting the SKIP LOCKED
solution, it's critical to understand why simpler approaches fail catastrophically under load.
1. The UPDATE ... RETURNING
Race Condition
A common first attempt involves a non-transactional, two-step process or a single atomic UPDATE
.
-- ANTI-PATTERN: This is prone to race conditions
UPDATE jobs
SET status = 'running', locked_by = 'worker-123'
WHERE id = (
SELECT id
FROM jobs
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1
)
RETURNING *;
Under low concurrency, this might appear to work. But with dozens of workers, the subquery (SELECT id ... LIMIT 1)
can be executed by multiple workers simultaneously before any of them execute the UPDATE
. They all see the same pending job, and multiple workers will attempt to claim it. While only one UPDATE
will ultimately succeed due to row-level locking, the others will either fail or, worse, block, waiting for a lock that will never be released in their favor. This is inefficient and unpredictable.
2. The FOR UPDATE
Contention Bottleneck
A more robust approach introduces a transaction with FOR UPDATE
.
-- ANTI-PATTERN for high concurrency
BEGIN;
SELECT id, payload
FROM jobs
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1
FOR UPDATE;
-- If a row is returned, update it
UPDATE jobs SET status = 'running', locked_by = 'worker-123' WHERE id = <retrieved_id>;
COMMIT;
This solves the race condition. FOR UPDATE
places an exclusive lock on the row returned by the SELECT
, preventing any other transaction from modifying or locking it until the current transaction commits or rolls back. However, it introduces a massive performance bottleneck.
Consider 50 workers running this query. Worker 1 selects and locks Job A. Workers 2 through 50 execute the same SELECT
and identify Job A as the next candidate. Instead of getting a result, they are all placed in a queue, waiting for Worker 1 to release its lock on Job A. They are effectively serialized, forming a single-file line. Your concurrency is 1, and your expensive cluster of 50 workers is mostly idle, waiting on locks. The system does not scale.
The Solution: Non-Blocking Dequeuing with `SKIP LOCKED`
Introduced in PostgreSQL 9.5, SKIP LOCKED
is a modifier for FOR UPDATE
and FOR SHARE
that transforms the locking behavior. When a row is selected that is already locked by another transaction, SKIP LOCKED
instructs Postgres to simply ignore it and move on to the next row that satisfies the WHERE
clause.
This is the key to true concurrent dequeuing. When our 50 workers execute their query, Worker 1 locks Job A. Workers 2 through 50 also attempt to lock Job A, but instead of waiting, SKIP LOCKED
causes them to immediately bypass it and evaluate the next available job in the ordered set—Job B, Job C, and so on. In an instant, 50 workers can claim 50 different jobs without blocking each other. This allows for near-linear scaling of throughput with the number of workers.
Production-Grade Schema and Indexing
Our implementation starts with a well-designed jobs
table.
CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'failed');
CREATE TABLE jobs (
id BIGSERIAL PRIMARY KEY,
queue_name TEXT NOT NULL DEFAULT 'default',
payload JSONB NOT NULL,
status job_status NOT NULL DEFAULT 'pending',
priority INTEGER NOT NULL DEFAULT 0, -- Higher is more important
max_retries INTEGER NOT NULL DEFAULT 3,
attempts INTEGER NOT NULL DEFAULT 0,
run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
locked_by TEXT, -- Identifier for the worker holding the lock
locked_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
failed_at TIMESTAMPTZ,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- The CRITICAL index for the dequeue query
CREATE INDEX idx_jobs_dequeue ON jobs (priority DESC, run_at ASC, created_at ASC) WHERE status = 'pending';
-- Optional: Index for janitor process to find stalled jobs
CREATE INDEX idx_jobs_stalled ON jobs (locked_at) WHERE status = 'running';
Schema Breakdown:
* status
: An ENUM
for clear state management.
* priority
: Allows more important jobs to be processed first.
* run_at
: Enables scheduled jobs.
* attempts
/max_retries
: Essential for building a robust retry mechanism.
* locked_by
/locked_at
: Crucial for monitoring and recovering from worker crashes (the "janitor" process).
* error_message
: Stores the reason for the last failure.
The Dequeue Index: The Secret to Performance
The most critical piece here is idx_jobs_dequeue
. This is a partial composite index. Let's break down why it's structured this way:
WHERE status = 'pending'
): The vast majority of our dequeue queries will only ever look for pending jobs. By making this a partial index, we keep its size dramatically smaller. It only contains entries for jobs that are candidates for processing, making scans incredibly fast and efficient. As jobs are completed or fail, they are removed from this index.priority DESC, run_at ASC, created_at ASC
): The order of columns in a composite index is paramount. It must match the ORDER BY
clause of our dequeue query. This allows Postgres to read the job candidates directly from the index in the correct order, without needing to perform a separate, expensive sort operation on the results. It finds the highest priority, earliest run_at
job almost instantly.Without this exact index, PostgreSQL would have to perform a full table scan or a much larger index scan, then sort the results in memory, which would become a severe bottleneck.
The Core Worker Logic: A Python Implementation
Here is a complete, production-ready worker implementation in Python using the psycopg
library. Note the careful transaction management.
import os
import time
import json
import psycopg
import logging
import socket
from datetime import datetime, timedelta, timezone
# --- Configuration ---
DB_CONN_STRING = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost:5432/jobs_db")
WORKER_ID = f"worker-{socket.gethostname()}-{os.getpid()}"
POLL_INTERVAL_SECONDS = 2
JOB_TIMEOUT_SECONDS = 300 # 5 minutes
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- The Dequeue Query ---
DEQUEUE_JOB_SQL = """
UPDATE jobs
SET
status = 'running',
locked_by = %(worker_id)s,
locked_at = NOW()
WHERE id = (
SELECT id
FROM jobs
WHERE status = 'pending' AND run_at <= NOW()
ORDER BY priority DESC, run_at ASC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, payload, attempts, max_retries;
"""
# --- Job Outcome Queries ---
COMPLETE_JOB_SQL = """
UPDATE jobs
SET status = 'completed', completed_at = NOW(), locked_by = NULL, locked_at = NULL
WHERE id = %(job_id)s;
"""
FAIL_JOB_PERMANENTLY_SQL = """
UPDATE jobs
SET status = 'failed', failed_at = NOW(), error_message = %(error)s, locked_by = NULL, locked_at = NULL
WHERE id = %(job_id)s;
"""
RETRY_JOB_SQL = """
UPDATE jobs
SET
status = 'pending',
attempts = attempts + 1,
run_at = NOW() + (%(delay)s * INTERVAL '1 second'),
locked_by = NULL,
locked_at = NULL,
error_message = %(error)s
WHERE id = %(job_id)s;
"""
def process_job(job_id, payload):
"""Simulates the actual work being done."""
logging.info(f"[{WORKER_ID}] Processing job {job_id} with payload: {payload}")
# Replace this with your actual business logic
# Example: send an email, process an image, etc.
if payload.get("should_fail", False):
raise ValueError("This job was configured to fail.")
time.sleep(5) # Simulate work
logging.info(f"[{WORKER_ID}] Finished processing job {job_id}")
def calculate_backoff(attempts):
"""Calculates exponential backoff with jitter."""
base_delay = 2 ** attempts
jitter = base_delay / 2
return base_delay + jitter
def main_loop():
logging.info(f"[{WORKER_ID}] Worker started.")
with psycopg.connect(DB_CONN_STRING) as conn:
while True:
job = None
try:
# 1. Atomically dequeue and lock a job
with conn.cursor() as cur:
cur.execute(DEQUEUE_JOB_SQL, {"worker_id": WORKER_ID})
job_record = cur.fetchone()
conn.commit() # Keep this transaction short!
if job_record:
job_id, payload, attempts, max_retries = job_record
job = {"id": job_id, "payload": payload, "attempts": attempts, "max_retries": max_retries}
# 2. Process the job (outside the dequeue transaction)
process_job(job_id, payload)
# 3. Mark job as complete in a new transaction
with conn.cursor() as cur:
cur.execute(COMPLETE_JOB_SQL, {"job_id": job_id})
conn.commit()
logging.info(f"[{WORKER_ID}] Job {job_id} marked as completed.")
else:
# No jobs found, wait before polling again
logging.info(f"[{WORKER_ID}] No jobs found. Sleeping for {POLL_INTERVAL_SECONDS}s.")
time.sleep(POLL_INTERVAL_SECONDS)
except Exception as e:
logging.error(f"[{WORKER_ID}] An error occurred: {e}")
if job:
job_id = job['id']
attempts = job['attempts']
max_retries = job['max_retries']
error_message = str(e)
# 4. Handle failures and retries
with conn.cursor() as cur:
if attempts + 1 >= max_retries:
# Max retries reached, fail permanently
logging.warning(f"[{WORKER_ID}] Job {job_id} reached max retries. Failing permanently.")
cur.execute(FAIL_JOB_PERMANENTLY_SQL, {"job_id": job_id, "error": error_message})
else:
# Retry with exponential backoff
delay = calculate_backoff(attempts + 1)
logging.info(f"[{WORKER_ID}] Job {job_id} failed. Retrying in {delay:.2f} seconds.")
cur.execute(RETRY_JOB_SQL, {"job_id": job_id, "error": error_message, "delay": delay})
conn.commit()
# If the error was in connection, psycopg will handle reconnects on next loop
time.sleep(POLL_INTERVAL_SECONDS)
if __name__ == "__main__":
main_loop()
Key Implementation Details:
UPDATE
-SELECT
: We use a UPDATE ... WHERE id = (SELECT ...)
pattern. This is now safe because the subquery SELECT
contains the FOR UPDATE SKIP LOCKED
clause. This entire statement is atomic. It finds a job, locks it, and updates its status to running
in a single, non-interruptible operation.UPDATE ... RETURNING
statement. This is critical. The long-running business logic (process_job
) happens outside of any transaction. Holding a transaction open for the duration of a job is an anti-pattern that can lead to long-lived locks, connection pool exhaustion, and VACUUM problems.completed
or failed
/retry
) is performed in a new, short-lived transaction. This ensures the job's state is durably recorded.calculate_backoff
function implements exponential backoff with jitter. This prevents waves of failing jobs from all retrying at the exact same time (a thundering herd problem).Handling Edge Cases: Building for Resilience
A simple worker loop is not enough. Production systems must be resilient to failure.
Edge Case 1: The Dead Worker
What happens if a worker process crashes or the server it's running on is terminated after it has locked a job but before it can complete it? The job's status will be 'running' indefinitely, and it will never be picked up again. This is a job leak.
Solution: The Janitor Process
We solve this with a 'janitor' process that periodically cleans up stale jobs. It finds jobs that have been in the 'running' state for longer than a defined timeout.
-- The Janitor Query
UPDATE jobs
SET
status = 'pending', -- Reset to be picked up again
attempts = attempts + 1,
locked_by = NULL,
locked_at = NULL,
error_message = 'Job timed out after being locked for too long.'
WHERE
status = 'running'
AND locked_at < NOW() - INTERVAL '5 minutes' -- Use your configured job timeout
AND attempts < max_retries;
-- Query to permanently fail jobs that have timed out too many times
UPDATE jobs
SET
status = 'failed',
failed_at = NOW(),
locked_by = NULL,
locked_at = NULL,
error_message = 'Job timed out after max retries.'
WHERE
status = 'running'
AND locked_at < NOW() - INTERVAL '5 minutes'
AND attempts >= max_retries;
This can be a simple cron job or a dedicated process that runs these queries every minute. The idx_jobs_stalled
index we created earlier makes this query efficient.
Edge Case 2: Graceful Shutdown
When deploying new code, you need to shut down old workers. If you simply kill -9
a worker, you create a dead worker scenario. Workers should trap signals like SIGINT
and SIGTERM
to perform a graceful shutdown.
A graceful shutdown sequence:
- Stop polling for new jobs.
- Finish the currently executing job, if any.
- Update the job's status correctly.
- Exit cleanly.
This requires more sophisticated application logic (e.g., using Python's signal
module) to set a flag that the main loop checks.
Performance, Benchmarks, and Scalability
The theoretical benefit of SKIP LOCKED
is clear, but what does it look like in practice?
Benchmark Scenario:
* Table: jobs
table with 1,000,000 pending jobs.
* Database: PostgreSQL 15 on a moderately powerful server.
* Workers: Python workers running the dequeue logic.
* Test 1: Using FOR UPDATE
(the blocking anti-pattern).
* Test 2: Using FOR UPDATE SKIP LOCKED
.
Expected Results:
* FOR UPDATE
: Throughput will increase slightly from 1 to ~5 workers, but then it will plateau completely. As you add more workers, the total number of jobs processed per second remains flat. pg_stat_activity
will show dozens of workers in Lock
wait states.
* FOR UPDATE SKIP LOCKED
: Throughput will scale almost linearly with the number of workers, up to the point where the database CPU, I/O, or connection limit is saturated. pg_stat_activity
will show most workers as active
and very few, if any, in a Lock
wait state.
Visualizing the Difference:
# Workers | FOR UPDATE (jobs/sec) | FOR UPDATE SKIP LOCKED (jobs/sec) |
---|---|---|
1 | 20 | 20 |
10 | 25 (severe contention) | 195 (near-linear scaling) |
50 | 26 (bottlenecked) | 950 (scaling continues) |
100 | 26 (maxed out) | 1800 (approaching CPU/IO limit) |
(These are illustrative numbers, but the performance curve is representative of real-world behavior.)
Scaling Further: Batch Dequeuing
For very high-throughput scenarios where individual job processing is extremely fast, the network round-trip and transaction overhead for dequeuing one job at a time can become the bottleneck. You can easily adapt the pattern to grab a batch of jobs.
-- Dequeue a batch of up to 10 jobs
UPDATE jobs
SET status = 'running', locked_by = %(worker_id)s, locked_at = NOW()
WHERE id IN (
SELECT id
FROM jobs
WHERE status = 'pending' AND run_at <= NOW()
ORDER BY priority DESC, run_at ASC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 10 -- Grab a batch
)
RETURNING id, payload; -- Returns up to 10 rows
A worker would then iterate through the returned batch, processing each job. This significantly reduces the number of queries and transactions per job processed.
When to Avoid This Pattern
While powerful, the Postgres-as-a-queue pattern isn't a silver bullet. You should consider a dedicated system like RabbitMQ or SQS when:
However, for a vast number of applications where jobs are closely tied to the application's data, this pattern provides an elegant, robust, and operationally simple solution that leverages the transactional guarantees and power of the database you're already running.
By understanding the nuances of FOR UPDATE SKIP LOCKED
, designing the correct schema and indexes, and implementing resilient worker logic with proper failure handling, you can build a background job system that rivals the reliability of many external solutions, without the added complexity.