High-Throughput Job Queues with Postgres `FOR UPDATE SKIP LOCKED`

20 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Senior Engineer's Dilemma: The In-Database Job Queue

As systems scale, the need for asynchronous background processing becomes non-negotiable. The default path for many is to reach for a dedicated message queueing system like RabbitMQ, SQS, or Kafka. While powerful, these introduce significant operational overhead: new infrastructure to manage, monitor, and secure, along with another point of failure. For many use cases, especially those where jobs are tightly coupled with primary application data, a robust job queue can be built directly within PostgreSQL.

The challenge, however, is concurrency. A naive implementation quickly falls apart under the pressure of multiple competing workers, leading to race conditions, deadlocks, and severe lock contention. This article is a deep dive into the definitive pattern for solving this problem: leveraging PostgreSQL's FOR UPDATE SKIP LOCKED clause to build a highly concurrent, scalable, and resilient job queue that avoids the common pitfalls.

We will not cover the basics of SELECT or UPDATE. We assume you understand database transactions, row-level locking, and the fundamental problems of concurrent data access. Our focus is on the production-grade implementation details that separate a toy queue from a system that can reliably process millions of jobs per day.

The Failure of Naive Approaches

Before dissecting the SKIP LOCKED solution, it's critical to understand why simpler approaches fail catastrophically under load.

1. The UPDATE ... RETURNING Race Condition

A common first attempt involves a non-transactional, two-step process or a single atomic UPDATE.

sql
-- ANTI-PATTERN: This is prone to race conditions
UPDATE jobs
SET status = 'running', locked_by = 'worker-123'
WHERE id = (
    SELECT id
    FROM jobs
    WHERE status = 'pending'
    ORDER BY priority DESC, created_at ASC
    LIMIT 1
)
RETURNING *;

Under low concurrency, this might appear to work. But with dozens of workers, the subquery (SELECT id ... LIMIT 1) can be executed by multiple workers simultaneously before any of them execute the UPDATE. They all see the same pending job, and multiple workers will attempt to claim it. While only one UPDATE will ultimately succeed due to row-level locking, the others will either fail or, worse, block, waiting for a lock that will never be released in their favor. This is inefficient and unpredictable.

2. The FOR UPDATE Contention Bottleneck

A more robust approach introduces a transaction with FOR UPDATE.

sql
-- ANTI-PATTERN for high concurrency
BEGIN;

SELECT id, payload
FROM jobs
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1
FOR UPDATE;

-- If a row is returned, update it
UPDATE jobs SET status = 'running', locked_by = 'worker-123' WHERE id = <retrieved_id>;

COMMIT;

This solves the race condition. FOR UPDATE places an exclusive lock on the row returned by the SELECT, preventing any other transaction from modifying or locking it until the current transaction commits or rolls back. However, it introduces a massive performance bottleneck.

Consider 50 workers running this query. Worker 1 selects and locks Job A. Workers 2 through 50 execute the same SELECT and identify Job A as the next candidate. Instead of getting a result, they are all placed in a queue, waiting for Worker 1 to release its lock on Job A. They are effectively serialized, forming a single-file line. Your concurrency is 1, and your expensive cluster of 50 workers is mostly idle, waiting on locks. The system does not scale.

The Solution: Non-Blocking Dequeuing with `SKIP LOCKED`

Introduced in PostgreSQL 9.5, SKIP LOCKED is a modifier for FOR UPDATE and FOR SHARE that transforms the locking behavior. When a row is selected that is already locked by another transaction, SKIP LOCKED instructs Postgres to simply ignore it and move on to the next row that satisfies the WHERE clause.

This is the key to true concurrent dequeuing. When our 50 workers execute their query, Worker 1 locks Job A. Workers 2 through 50 also attempt to lock Job A, but instead of waiting, SKIP LOCKED causes them to immediately bypass it and evaluate the next available job in the ordered set—Job B, Job C, and so on. In an instant, 50 workers can claim 50 different jobs without blocking each other. This allows for near-linear scaling of throughput with the number of workers.

Production-Grade Schema and Indexing

Our implementation starts with a well-designed jobs table.

sql
CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'failed');

CREATE TABLE jobs (
    id BIGSERIAL PRIMARY KEY,
    queue_name TEXT NOT NULL DEFAULT 'default',
    payload JSONB NOT NULL,
    status job_status NOT NULL DEFAULT 'pending',
    priority INTEGER NOT NULL DEFAULT 0, -- Higher is more important
    max_retries INTEGER NOT NULL DEFAULT 3,
    attempts INTEGER NOT NULL DEFAULT 0,
    run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    locked_by TEXT, -- Identifier for the worker holding the lock
    locked_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    failed_at TIMESTAMPTZ,
    error_message TEXT,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- The CRITICAL index for the dequeue query
CREATE INDEX idx_jobs_dequeue ON jobs (priority DESC, run_at ASC, created_at ASC) WHERE status = 'pending';

-- Optional: Index for janitor process to find stalled jobs
CREATE INDEX idx_jobs_stalled ON jobs (locked_at) WHERE status = 'running';

Schema Breakdown:

* status: An ENUM for clear state management.

* priority: Allows more important jobs to be processed first.

* run_at: Enables scheduled jobs.

* attempts/max_retries: Essential for building a robust retry mechanism.

* locked_by/locked_at: Crucial for monitoring and recovering from worker crashes (the "janitor" process).

* error_message: Stores the reason for the last failure.

The Dequeue Index: The Secret to Performance

The most critical piece here is idx_jobs_dequeue. This is a partial composite index. Let's break down why it's structured this way:

  • Partial Index (WHERE status = 'pending'): The vast majority of our dequeue queries will only ever look for pending jobs. By making this a partial index, we keep its size dramatically smaller. It only contains entries for jobs that are candidates for processing, making scans incredibly fast and efficient. As jobs are completed or fail, they are removed from this index.
  • Composite Key Order (priority DESC, run_at ASC, created_at ASC): The order of columns in a composite index is paramount. It must match the ORDER BY clause of our dequeue query. This allows Postgres to read the job candidates directly from the index in the correct order, without needing to perform a separate, expensive sort operation on the results. It finds the highest priority, earliest run_at job almost instantly.
  • Without this exact index, PostgreSQL would have to perform a full table scan or a much larger index scan, then sort the results in memory, which would become a severe bottleneck.

    The Core Worker Logic: A Python Implementation

    Here is a complete, production-ready worker implementation in Python using the psycopg library. Note the careful transaction management.

    python
    import os
    import time
    import json
    import psycopg
    import logging
    import socket
    from datetime import datetime, timedelta, timezone
    
    # --- Configuration ---
    DB_CONN_STRING = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost:5432/jobs_db")
    WORKER_ID = f"worker-{socket.gethostname()}-{os.getpid()}"
    POLL_INTERVAL_SECONDS = 2
    JOB_TIMEOUT_SECONDS = 300  # 5 minutes
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    # --- The Dequeue Query ---
    DEQUEUE_JOB_SQL = """
        UPDATE jobs
        SET
            status = 'running',
            locked_by = %(worker_id)s,
            locked_at = NOW()
        WHERE id = (
            SELECT id
            FROM jobs
            WHERE status = 'pending' AND run_at <= NOW()
            ORDER BY priority DESC, run_at ASC, created_at ASC
            FOR UPDATE SKIP LOCKED
            LIMIT 1
        )
        RETURNING id, payload, attempts, max_retries;
    """
    
    # --- Job Outcome Queries ---
    COMPLETE_JOB_SQL = """
        UPDATE jobs
        SET status = 'completed', completed_at = NOW(), locked_by = NULL, locked_at = NULL
        WHERE id = %(job_id)s;
    """
    
    FAIL_JOB_PERMANENTLY_SQL = """
        UPDATE jobs
        SET status = 'failed', failed_at = NOW(), error_message = %(error)s, locked_by = NULL, locked_at = NULL
        WHERE id = %(job_id)s;
    """
    
    RETRY_JOB_SQL = """
        UPDATE jobs
        SET
            status = 'pending',
            attempts = attempts + 1,
            run_at = NOW() + (%(delay)s * INTERVAL '1 second'),
            locked_by = NULL,
            locked_at = NULL,
            error_message = %(error)s
        WHERE id = %(job_id)s;
    """
    
    def process_job(job_id, payload):
        """Simulates the actual work being done."""
        logging.info(f"[{WORKER_ID}] Processing job {job_id} with payload: {payload}")
        # Replace this with your actual business logic
        # Example: send an email, process an image, etc.
        if payload.get("should_fail", False):
            raise ValueError("This job was configured to fail.")
        time.sleep(5) # Simulate work
        logging.info(f"[{WORKER_ID}] Finished processing job {job_id}")
    
    def calculate_backoff(attempts):
        """Calculates exponential backoff with jitter."""
        base_delay = 2 ** attempts
        jitter = base_delay / 2
        return base_delay + jitter
    
    def main_loop():
        logging.info(f"[{WORKER_ID}] Worker started.")
        with psycopg.connect(DB_CONN_STRING) as conn:
            while True:
                job = None
                try:
                    # 1. Atomically dequeue and lock a job
                    with conn.cursor() as cur:
                        cur.execute(DEQUEUE_JOB_SQL, {"worker_id": WORKER_ID})
                        job_record = cur.fetchone()
                        conn.commit() # Keep this transaction short!
    
                    if job_record:
                        job_id, payload, attempts, max_retries = job_record
                        job = {"id": job_id, "payload": payload, "attempts": attempts, "max_retries": max_retries}
    
                        # 2. Process the job (outside the dequeue transaction)
                        process_job(job_id, payload)
    
                        # 3. Mark job as complete in a new transaction
                        with conn.cursor() as cur:
                            cur.execute(COMPLETE_JOB_SQL, {"job_id": job_id})
                            conn.commit()
                        logging.info(f"[{WORKER_ID}] Job {job_id} marked as completed.")
    
                    else:
                        # No jobs found, wait before polling again
                        logging.info(f"[{WORKER_ID}] No jobs found. Sleeping for {POLL_INTERVAL_SECONDS}s.")
                        time.sleep(POLL_INTERVAL_SECONDS)
    
                except Exception as e:
                    logging.error(f"[{WORKER_ID}] An error occurred: {e}")
                    if job:
                        job_id = job['id']
                        attempts = job['attempts']
                        max_retries = job['max_retries']
                        error_message = str(e)
    
                        # 4. Handle failures and retries
                        with conn.cursor() as cur:
                            if attempts + 1 >= max_retries:
                                # Max retries reached, fail permanently
                                logging.warning(f"[{WORKER_ID}] Job {job_id} reached max retries. Failing permanently.")
                                cur.execute(FAIL_JOB_PERMANENTLY_SQL, {"job_id": job_id, "error": error_message})
                            else:
                                # Retry with exponential backoff
                                delay = calculate_backoff(attempts + 1)
                                logging.info(f"[{WORKER_ID}] Job {job_id} failed. Retrying in {delay:.2f} seconds.")
                                cur.execute(RETRY_JOB_SQL, {"job_id": job_id, "error": error_message, "delay": delay})
                            conn.commit()
                    # If the error was in connection, psycopg will handle reconnects on next loop
                    time.sleep(POLL_INTERVAL_SECONDS)
    
    if __name__ == "__main__":
        main_loop()

    Key Implementation Details:

  • Combined UPDATE-SELECT: We use a UPDATE ... WHERE id = (SELECT ...) pattern. This is now safe because the subquery SELECT contains the FOR UPDATE SKIP LOCKED clause. This entire statement is atomic. It finds a job, locks it, and updates its status to running in a single, non-interruptible operation.
  • Short-Lived Transactions: The transaction that dequeues the job is committed immediately after the UPDATE ... RETURNING statement. This is critical. The long-running business logic (process_job) happens outside of any transaction. Holding a transaction open for the duration of a job is an anti-pattern that can lead to long-lived locks, connection pool exhaustion, and VACUUM problems.
  • Separate Outcome Transactions: The final status update (completed or failed/retry) is performed in a new, short-lived transaction. This ensures the job's state is durably recorded.
  • Exponential Backoff: The calculate_backoff function implements exponential backoff with jitter. This prevents waves of failing jobs from all retrying at the exact same time (a thundering herd problem).
  • Handling Edge Cases: Building for Resilience

    A simple worker loop is not enough. Production systems must be resilient to failure.

    Edge Case 1: The Dead Worker

    What happens if a worker process crashes or the server it's running on is terminated after it has locked a job but before it can complete it? The job's status will be 'running' indefinitely, and it will never be picked up again. This is a job leak.

    Solution: The Janitor Process

    We solve this with a 'janitor' process that periodically cleans up stale jobs. It finds jobs that have been in the 'running' state for longer than a defined timeout.

    sql
    -- The Janitor Query
    UPDATE jobs
    SET
        status = 'pending', -- Reset to be picked up again
        attempts = attempts + 1,
        locked_by = NULL,
        locked_at = NULL,
        error_message = 'Job timed out after being locked for too long.'
    WHERE
        status = 'running'
        AND locked_at < NOW() - INTERVAL '5 minutes' -- Use your configured job timeout
        AND attempts < max_retries;
    
    -- Query to permanently fail jobs that have timed out too many times
    UPDATE jobs
    SET
        status = 'failed',
        failed_at = NOW(),
        locked_by = NULL,
        locked_at = NULL,
        error_message = 'Job timed out after max retries.'
    WHERE
        status = 'running'
        AND locked_at < NOW() - INTERVAL '5 minutes'
        AND attempts >= max_retries;

    This can be a simple cron job or a dedicated process that runs these queries every minute. The idx_jobs_stalled index we created earlier makes this query efficient.

    Edge Case 2: Graceful Shutdown

    When deploying new code, you need to shut down old workers. If you simply kill -9 a worker, you create a dead worker scenario. Workers should trap signals like SIGINT and SIGTERM to perform a graceful shutdown.

    A graceful shutdown sequence:

    • Stop polling for new jobs.
    • Finish the currently executing job, if any.
    • Update the job's status correctly.
    • Exit cleanly.

    This requires more sophisticated application logic (e.g., using Python's signal module) to set a flag that the main loop checks.

    Performance, Benchmarks, and Scalability

    The theoretical benefit of SKIP LOCKED is clear, but what does it look like in practice?

    Benchmark Scenario:

    * Table: jobs table with 1,000,000 pending jobs.

    * Database: PostgreSQL 15 on a moderately powerful server.

    * Workers: Python workers running the dequeue logic.

    * Test 1: Using FOR UPDATE (the blocking anti-pattern).

    * Test 2: Using FOR UPDATE SKIP LOCKED.

    Expected Results:

    * FOR UPDATE: Throughput will increase slightly from 1 to ~5 workers, but then it will plateau completely. As you add more workers, the total number of jobs processed per second remains flat. pg_stat_activity will show dozens of workers in Lock wait states.

    * FOR UPDATE SKIP LOCKED: Throughput will scale almost linearly with the number of workers, up to the point where the database CPU, I/O, or connection limit is saturated. pg_stat_activity will show most workers as active and very few, if any, in a Lock wait state.

    Visualizing the Difference:

    # WorkersFOR UPDATE (jobs/sec)FOR UPDATE SKIP LOCKED (jobs/sec)
    12020
    1025 (severe contention)195 (near-linear scaling)
    5026 (bottlenecked)950 (scaling continues)
    10026 (maxed out)1800 (approaching CPU/IO limit)

    (These are illustrative numbers, but the performance curve is representative of real-world behavior.)

    Scaling Further: Batch Dequeuing

    For very high-throughput scenarios where individual job processing is extremely fast, the network round-trip and transaction overhead for dequeuing one job at a time can become the bottleneck. You can easily adapt the pattern to grab a batch of jobs.

    sql
    -- Dequeue a batch of up to 10 jobs
    UPDATE jobs
    SET status = 'running', locked_by = %(worker_id)s, locked_at = NOW()
    WHERE id IN (
        SELECT id
        FROM jobs
        WHERE status = 'pending' AND run_at <= NOW()
        ORDER BY priority DESC, run_at ASC, created_at ASC
        FOR UPDATE SKIP LOCKED
        LIMIT 10 -- Grab a batch
    )
    RETURNING id, payload; -- Returns up to 10 rows

    A worker would then iterate through the returned batch, processing each job. This significantly reduces the number of queries and transactions per job processed.

    When to Avoid This Pattern

    While powerful, the Postgres-as-a-queue pattern isn't a silver bullet. You should consider a dedicated system like RabbitMQ or SQS when:

  • Extreme Throughput is Required: If you need to process hundreds of thousands of jobs per second, a dedicated, optimized system will likely outperform a general-purpose database.
  • Complex Routing and Fan-out: If you need pub/sub, topic exchanges, or complex message routing rules, message brokers are built for this. Emulating it in Postgres is cumbersome.
  • Decoupling from the Database: If your job processors are entirely separate services that shouldn't have direct access to your primary application database, a message queue provides a clean decoupling layer.
  • Long-term Job Retention: If you need to store millions of completed or failed jobs for extended periods for audit purposes, this can cause significant table bloat in your primary database. An external system or archiving strategy is better.
  • However, for a vast number of applications where jobs are closely tied to the application's data, this pattern provides an elegant, robust, and operationally simple solution that leverages the transactional guarantees and power of the database you're already running.

    By understanding the nuances of FOR UPDATE SKIP LOCKED, designing the correct schema and indexes, and implementing resilient worker logic with proper failure handling, you can build a background job system that rivals the reliability of many external solutions, without the added complexity.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles