PostgreSQL SKIP LOCKED for High-Throughput Concurrent Job Queues

16 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Senior Engineer's Dilemma: The Overhead of a Dedicated Message Broker

In many architectures, the default solution for background job processing is to introduce a dedicated message broker like RabbitMQ, SQS, or Kafka. While powerful, these systems add significant operational overhead: another service to deploy, monitor, secure, and scale. For applications where jobs are tightly coupled with the primary relational data, a frequently overlooked and highly potent alternative exists directly within PostgreSQL: a database-backed job queue.

However, implementing a concurrent job queue naively is fraught with peril. A simple SELECT ... FOR UPDATE on a jobs table will serialize worker access, effectively turning your multi-worker setup into a single-threaded process, defeating the entire purpose of concurrency. This is where senior engineers must look deeper into the database's capabilities.

This article is a deep dive into the FOR UPDATE SKIP LOCKED clause in PostgreSQL, a mechanism that transforms the database from a concurrency bottleneck into a high-throughput job processing engine. We will dissect production-ready patterns, from schema design and transactional workers to failure handling and performance tuning, demonstrating how to build a robust system that rivals the reliability of external brokers for many common use cases.


1. Foundational Schema: Designing for Concurrency and State

A robust schema is the bedrock of our queue. It must not only store the job payload but also manage its state through the entire lifecycle, including locking, retries, and failures.

Here is a production-grade jobs table schema:

sql
CREATE TYPE job_status AS ENUM ('available', 'running', 'completed', 'failed');

CREATE TABLE jobs (
    id BIGSERIAL PRIMARY KEY,
    queue_name TEXT NOT NULL DEFAULT 'default',
    payload JSONB NOT NULL,
    status job_status NOT NULL DEFAULT 'available',
    priority INTEGER NOT NULL DEFAULT 0, -- Lower number means higher priority
    attempts INTEGER NOT NULL DEFAULT 0,
    max_attempts INTEGER NOT NULL DEFAULT 5,
    run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    locked_at TIMESTAMPTZ,
    locked_by TEXT, -- Identifier for the worker holding the lock
    last_error TEXT,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Index for fetching available jobs efficiently
CREATE INDEX idx_jobs_fetch ON jobs (priority, run_at) WHERE status = 'available';

-- Index for finding stuck jobs (reaper process)
CREATE INDEX idx_jobs_reap ON jobs (locked_at) WHERE status = 'running';

-- Optional: Index for looking up jobs by queue
CREATE INDEX idx_jobs_queue_name ON jobs (queue_name);

-- Trigger to update the updated_at column on every change
CREATE OR REPLACE FUNCTION set_updated_at() RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER jobs_updated_at_trigger
BEFORE UPDATE ON jobs
FOR EACH ROW EXECUTE FUNCTION set_updated_at();

Schema Dissection for Senior Engineers:

* job_status ENUM: Enforces state integrity at the database level. Far superior to using magic strings.

* queue_name: Enables multi-tenancy or categorization, allowing different worker pools to subscribe to specific queues.

* priority & run_at: The core drivers for job selection. run_at facilitates scheduled jobs and exponential backoff on retries.

* attempts & max_attempts: Essential for building resilient workers that can handle transient failures.

locked_at & locked_by: These are critical* for managing concurrency and solving the "stuck worker" problem. locked_by should be a unique identifier for each worker instance (e.g., a hostname and PID, or a UUID).

* idx_jobs_fetch: This is a Partial Index. It's a massive performance win because it only indexes rows that workers are actually interested in (status = 'available'). A full index on (priority, run_at) would be bloated with completed and failed jobs, slowing down scans.

* idx_jobs_reap: Another partial index, this one specifically for a "reaper" process that we'll discuss later, tasked with finding and releasing locks from dead workers.

2. The Contention Problem: Why `FOR UPDATE` Alone Fails

Let's first illustrate the problem we are solving. A naive approach might use a simple SELECT ... FOR UPDATE query. Imagine 10 workers running this query concurrently:

sql
-- DO NOT USE THIS IN PRODUCTION FOR A CONCURRENT QUEUE
BEGIN;

SELECT id, payload
FROM jobs
WHERE status = 'available' AND run_at <= NOW()
ORDER BY priority, run_at
LIMIT 1
FOR UPDATE;

-- If a job is found, the worker would then:
-- UPDATE jobs SET status = 'running', locked_at = NOW(), ... WHERE id = ?;

COMMIT;

What happens?

  • Worker 1 starts a transaction and executes the SELECT. It finds the highest-priority job and places a row-level lock on it.
  • Workers 2-10 execute the same query simultaneously. They all identify the same highest-priority job.
  • Because Worker 1 holds the lock, Workers 2-10 will block. They will wait until Worker 1's transaction either commits or rolls back.
  • Once Worker 1 commits, Worker 2 acquires the lock, but the job's status has likely been changed. Worker 2's query might now return nothing, or it might re-evaluate and try to lock the next available job.
  • The net effect is serialization. Your workers spend most of their time waiting for locks instead of doing work. Throughput plummets and is effectively limited to that of a single worker.

    3. The Solution: `SKIP LOCKED` for True Concurrency

    This is where SKIP LOCKED changes the game. It instructs PostgreSQL: "Attempt to acquire a lock on the rows selected by the query, but if a row is already locked by another transaction, don't wait for it—just skip it and move on to the next one."

    Here is the correct, high-concurrency query to fetch a job:

    sql
    -- The Production-Ready Fetch-and-Lock Query
    UPDATE jobs
    SET
        status = 'running',
        locked_at = NOW(),
        locked_by = 'worker-uuid-123',
        attempts = attempts + 1
    WHERE id = (
        SELECT id
        FROM jobs
        WHERE status = 'available' AND run_at <= NOW()
        ORDER BY priority, run_at
        LIMIT 1
        FOR UPDATE SKIP LOCKED
    )
    RETURNING id, payload, attempts;

    Dissecting the Atomic Fetch-and-Update:

  • Atomicity: This is a single, atomic query. The SELECT subquery and the outer UPDATE happen as one operation. This is superior to a separate SELECT followed by an UPDATE as it avoids a race condition between the two statements.
  • Subquery with FOR UPDATE SKIP LOCKED:
  • * The inner SELECT identifies the best candidate job just like before.

    * FOR UPDATE signals the intent to lock the row.

    SKIP LOCKED is the magic. If Worker 1 has already locked Job A, when Worker 2 runs this query, its SELECT subquery will simply pretend Job A doesn't exist and will evaluate the next* available job (Job B).

  • No Blocking: Worker 2 immediately gets Job B and can start processing it in parallel with Worker 1. Worker 3 gets Job C, and so on. Your workers are now working concurrently, limited only by system resources and the number of available jobs.
  • RETURNING Clause: This is a PostgreSQL-specific feature that is incredibly efficient. Instead of running a subsequent SELECT to get the job details, the UPDATE statement returns the specified columns from the modified row, saving a database round-trip.
  • 4. A Production-Grade Go Worker Implementation

    Let's translate this into a realistic worker implementation using Go and the pgx library, which is well-suited for high-performance PostgreSQL interaction.

    go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"os"
    	"os/signal"
    	"syscall"
    	"time"
    
    	"github.com/google/uuid"
    	"github.com/jackc/pgx/v5/pgxpool"
    )
    
    type Job struct {
    	ID       int64
    	Payload  json.RawMessage
    	Attempts int
    }
    
    type Worker struct {
    	ID   string
    	Pool *pgxpool.Pool
    }
    
    func NewWorker(pool *pgxpool.Pool) *Worker {
    	return &Worker{
    		ID:   fmt.Sprintf("worker-%s", uuid.New().String()),
    		Pool: pool,
    	}
    }
    
    const fetchJobQuery = `
    UPDATE jobs
    SET
        status = 'running',
        locked_at = NOW(),
        locked_by = $1,
        attempts = attempts + 1
    WHERE id = (
        SELECT id
        FROM jobs
        WHERE status = 'available' AND run_at <= NOW()
        ORDER BY priority, run_at
        LIMIT 1
        FOR UPDATE SKIP LOCKED
    )
    RETURNING id, payload, attempts;`
    
    func (w *Worker) FetchAndProcessJob(ctx context.Context) (bool, error) {
    	job := &Job{}
    
    	// The entire fetch is a single atomic query, so no explicit transaction is needed here.
    	err := w.Pool.QueryRow(ctx, fetchJobQuery, w.ID).Scan(&job.ID, &job.Payload, &job.Attempts)
    	if err != nil {
    		if err.Error() == "no rows in result set" { // pgx.ErrNoRows is not exported in v5
    			return false, nil // No job found, not an error
    		}
    		return false, fmt.Errorf("failed to fetch job: %w", err)
    	}
    
    	fmt.Printf("[%s] Started processing job %d (attempt %d)\n", w.ID, job.ID, job.Attempts)
    
    	// Simulate processing the job
    	err = processPayload(job.Payload)
    
    	if err != nil {
    		fmt.Printf("[%s] Error processing job %d: %v\n", w.ID, job.ID, err)
    		// Handle failure: exponential backoff and potential move to DLQ
    		if err := w.handleJobFailure(ctx, job, err.Error()); err != nil {
    			return true, fmt.Errorf("failed to handle job failure for job %d: %w", job.ID, err)
    		}
    		return true, nil
    	}
    
    	// Handle success: mark job as completed
    	if err := w.handleJobSuccess(ctx, job); err != nil {
    		return true, fmt.Errorf("failed to mark job %d as completed: %w", job.ID, err)
    	}
    
    	fmt.Printf("[%s] Successfully completed job %d\n", w.ID, job.ID)
    	return true, nil
    }
    
    func (w *Worker) handleJobSuccess(ctx context.Context, job *Job) error {
    	_, err := w.Pool.Exec(ctx, "UPDATE jobs SET status = 'completed' WHERE id = $1", job.ID)
    	return err
    }
    
    const handleFailureQuery = `
    UPDATE jobs
    SET
        status = 'available',
        run_at = NOW() + ($1 * interval '1 second'),
        last_error = $2,
        locked_at = NULL,
        locked_by = NULL
    WHERE id = $3;`
    
    const moveToDLQQuery = `
    UPDATE jobs
    SET
        status = 'failed',
        last_error = $1,
        locked_at = NULL,
        locked_by = NULL
    WHERE id = $2;`
    
    func (w *Worker) handleJobFailure(ctx context.Context, job *Job, jobError string) error {
    	// Check if the job has exceeded max attempts
    	var maxAttempts int
    	err := w.Pool.QueryRow(ctx, "SELECT max_attempts FROM jobs WHERE id = $1", job.ID).Scan(&maxAttempts)
    	if err != nil {
    		return err
    	}
    
    	if job.Attempts >= maxAttempts {
    		fmt.Printf("[%s] Job %d reached max attempts. Moving to failed state.\n", w.ID, job.ID)
    		_, err := w.Pool.Exec(ctx, moveToDLQQuery, jobError, job.ID)
    		return err
    	}
    
    	// Exponential backoff: 2^attempts seconds
    	backoffSeconds := 1 << uint(job.Attempts)
    	fmt.Printf("[%s] Job %d failed. Retrying in %d seconds.\n", w.ID, job.ID, backoffSeconds)
    
    	_, err = w.Pool.Exec(ctx, handleFailureQuery, backoffSeconds, jobError, job.ID)
    	return err
    }
    
    // Dummy processing function
    func processPayload(payload json.RawMessage) error {
    	time.Sleep(1 * time.Second) // Simulate work
    	// Simulate a transient error for demonstration
    	if time.Now().Unix()%10 == 0 {
    		return fmt.Errorf("transient processing error")
    	}
    	return nil
    }
    
    func main() {
    	databaseUrl := "postgres://user:password@localhost:5432/jobs_db?sslmode=disable"
    	pool, err := pgxpool.New(context.Background(), databaseUrl)
    	if err != nil {
    		fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
    		os.Exit(1)
    	}
    	defer pool.Close()
    
    	worker := NewWorker(pool)
    	fmt.Printf("Starting worker %s\n", worker.ID)
    
    	ctx, cancel := context.WithCancel(context.Background())
    	defer cancel()
    
    	// Graceful shutdown handling
    	sigChan := make(chan os.Signal, 1)
    	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    	go func() {
    		<-sigChan
    		fmt.Println("Shutdown signal received, stopping worker...")
    		cancel()
    	}()
    
    	// Main worker loop
    	ticker := time.NewTicker(250 * time.Millisecond) // Poll for jobs
    	defer ticker.Stop()
    
    	for {
    		select {
    		case <-ctx.Done():
    			fmt.Println("Worker stopped.")
    			return
    		case <-ticker.C:
    			processed, err := worker.FetchAndProcessJob(ctx)
    			if err != nil {
    				fmt.Fprintf(os.Stderr, "Error during job processing cycle: %v\n", err)
    			}
    			// If we processed a job, don't wait for the ticker, try to get another one immediately.
    			if processed {
    				ticker.Reset(1 * time.Millisecond)
    			} else {
    				ticker.Reset(250 * time.Millisecond)
    			}
    		}
    	}
    }
    

    5. Advanced Topic: The Stuck Worker and the Reaper Process

    What happens if a worker process crashes, is forcefully terminated, or loses network connectivity after it has locked a job? The job's status will be 'running' indefinitely, and it will never be processed again. This is a critical failure mode.

    This is why our schema includes locked_at and locked_by. We can implement a separate, periodic process—a "reaper"—to find and resurrect these orphaned jobs.

    The Reaper's Logic:

  • Periodically scan the jobs table for rows where status = 'running'.
  • Check if locked_at is older than a defined timeout (e.g., 5 minutes). This timeout must be longer than the longest conceivable legitimate job processing time.
  • If a job's lock has expired, reset its status to 'available', clear locked_at and locked_by, and potentially log the incident.
  • Here is the reaper's SQL query:

    sql
    -- Find and reset jobs with stale locks
    UPDATE jobs
    SET
        status = 'available',
        locked_at = NULL,
        locked_by = NULL,
        last_error = 'Job lock expired, reset by reaper.'
    WHERE id IN (
        SELECT id FROM jobs
        WHERE status = 'running' AND locked_at < NOW() - INTERVAL '5 minutes'
        FOR UPDATE SKIP LOCKED -- Use SKIP LOCKED here too to avoid races with other reapers!
    )
    RETURNING id, locked_by;

    Critical Edge Case: The Reaper Race Condition

    Imagine this scenario:

    • A worker is taking a long time to process a job, nearing the 5-minute timeout.
    • The reaper process runs, sees the job's lock is about to expire, and decides to reset it.
  • Just before the reaper commits its UPDATE, the original worker successfully finishes its job and commits its own UPDATE to set the status to 'completed'.
    • The reaper's transaction now commits, overwriting the 'completed' status and setting it back to 'available'. The job will now run a second time, potentially causing data corruption or other side effects.

    Solution: The reaper should never reset a job that was locked by itself. This is a simple but effective guard. A more robust solution involves a fencing token or generation number, but for most use cases, checking locked_by is sufficient. An even more advanced approach is to use PostgreSQL's advisory locks as a distributed lock for the reaper process itself, ensuring only one reaper can run at a time across your entire infrastructure.

    6. Performance Tuning and Benchmarking

    Performance of this system hinges almost entirely on the efficiency of the job-fetching query.

    Indexing is Non-Negotiable:

    As mentioned, the partial index idx_jobs_fetch ON jobs (priority, run_at) WHERE status = 'available' is the most important optimization. Let's look at an EXPLAIN ANALYZE for our fetch query:

    sql
    EXPLAIN ANALYZE UPDATE jobs ...

    You want to see an Index Scan or Index Only Scan on idx_jobs_fetch. If you see a Bitmap Heap Scan or a Sequential Scan, your index is not being used effectively, and performance will degrade dramatically as the jobs table grows. This usually happens if the WHERE clause of your query doesn't perfectly match the WHERE clause of your partial index.

    Benchmark: FOR UPDATE vs. FOR UPDATE SKIP LOCKED

    To demonstrate the impact, we can run a benchmark using pgbench.

    * Setup: A jobs table with 1,000,000 available jobs.

    * Test 1 (Naive FOR UPDATE): pgbench script that runs the blocking SELECT ... FOR UPDATE followed by an UPDATE in a transaction.

    * Test 2 (SKIP LOCKED): pgbench script that runs our atomic UPDATE ... FROM SELECT ... FOR UPDATE SKIP LOCKED query.

    Hypothetical Results:

    Concurrent WorkersNaive FOR UPDATE (Jobs/sec)SKIP LOCKED (Jobs/sec)
    1250255
    102602450
    50265 (No improvement)9800
    100262 (Contention)15500 (CPU/IO Bound)

    The results are stark. The naive approach shows almost zero scalability. Its throughput is fixed, regardless of the number of workers. The SKIP LOCKED approach scales almost linearly with the number of workers until it becomes bound by other system resources like CPU, I/O, or database connection limits.

    7. When *Not* to Use This Pattern

    While powerful, a PostgreSQL-based queue is not a panacea. It's crucial to understand its limitations and when a dedicated message broker is the superior choice.

    Choose a dedicated broker (RabbitMQ, SQS, Kafka) when you need:

    * Complex Routing: Fan-out/fan-in patterns, topic-based routing, or routing based on message headers are strengths of systems like RabbitMQ.

    * Extreme Scale: If you are processing hundreds of thousands of jobs per second, the write contention on the jobs table could become a bottleneck. Systems like Kafka are designed for this scale.

    * Cross-Service Communication (Microservices): Using a database as a communication bus between services can create tight coupling. A message broker provides a clearer contract and decouples service lifecycles.

    * Push-based Delivery: The pattern we've described is pull-based (workers poll for jobs). If you need jobs to be pushed to consumers the moment they are available, a broker is a better fit.

    * Language/Platform Independence: Message brokers use standard protocols like AMQP or MQTT, making it easy for services written in different languages to communicate.

    The sweet spot for the PostgreSQL queue is:

    * Within a monolithic or service-oriented architecture where the job processing is tightly coupled to the application's primary data.

    * For moderate throughput needs (from tens to thousands of jobs per second).

    * When you want to avoid the operational complexity of adding another piece of infrastructure to your stack.

    Conclusion

    The FOR UPDATE SKIP LOCKED clause is a testament to the power and maturity of PostgreSQL. It provides a first-class primitive for building highly concurrent, reliable, and performant job queues directly within the database. By moving beyond naive locking strategies and embracing atomic operations, robust state management, and careful indexing, you can create a background processing system that is both simple to operate and powerful enough for a vast number of production workloads. It's a pattern that every senior engineer working with PostgreSQL should have in their toolkit, allowing them to make informed architectural decisions and avoid unnecessary complexity when the right tool for the job is the one they're already using.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles