Advanced Locking Strategies in PostgreSQL for High-Concurrency Systems

23 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitable Concurrency Wall

As systems scale, the comforting simplicity of standard ACID transactions and isolation levels like READ COMMITTED begins to fray at the edges. You hit a wall where concurrent operations on shared resources lead to subtle but critical business logic failures: race conditions, double-bookings, and inconsistent state. While Optimistic Concurrency Control (OCC) using version columns is a valid strategy, it can lead to high transaction abort rates under heavy write contention, pushing complexity back into the application layer for retries.

This is where senior engineers must reach deeper into the database's toolkit. PostgreSQL offers a powerful, albeit sharp, set of explicit locking primitives that provide direct, granular control over concurrent execution. Mastering these mechanisms—pessimistic locking, selective row acquisition, and application-level locks—is the difference between a system that scales gracefully and one that collapses into a quagmire of deadlocks and performance bottlenecks.

This article is a deep dive into three such advanced strategies. We will bypass introductory concepts and focus directly on production patterns, performance implications, and the subtle edge cases that often surface only under heavy load.


1. The Foundation: Pessimistic Locking with `SELECT ... FOR UPDATE`

The most direct approach to preventing race conditions is pessimistic locking. Instead of optimistically hoping for the best, you explicitly lock the rows you intend to modify for the duration of your transaction. Other transactions attempting to acquire a conflicting lock on the same rows are forced to wait.

Problem Scenario: The Inventory Race Condition

Consider a classic e-commerce scenario: managing product inventory. A naive implementation might look like this:

python
# WARNING: This code contains a race condition!
def process_order(db_conn, product_id, quantity_to_purchase):
    with db_conn.cursor() as cursor:
        # 1. Check current stock
        cursor.execute("SELECT quantity_on_hand FROM products WHERE id = %s", (product_id,))
        current_stock = cursor.fetchone()[0]

        if current_stock >= quantity_to_purchase:
            # 2. Update stock
            new_stock = current_stock - quantity_to_purchase
            cursor.execute(
                "UPDATE products SET quantity_on_hand = %s WHERE id = %s",
                (new_stock, product_id)
            )
            db_conn.commit()
            return {"status": "success"}
        else:
            db_conn.rollback()
            return {"status": "insufficient_stock"}

Under concurrent load, two requests for the last remaining item (quantity_on_hand = 1) could both execute the SELECT statement, both see 1, both pass the if check, and both proceed to the UPDATE. The result? The stock becomes -1, and you've oversold the product. Wrapping this in a transaction (BEGIN; ... COMMIT;) does not solve the problem, because the default READ COMMITTED isolation level doesn't prevent other transactions from reading the same data before the first one commits its update.

The `FOR UPDATE` Solution

By appending FOR UPDATE to the SELECT query, we instruct PostgreSQL to acquire an exclusive lock on the returned rows. This lock is held until the transaction is either committed or rolled back.

Production-Grade Implementation:

python
import psycopg2
import threading
import time

# Database connection string
DB_CONN_STR = "dbname=test user=postgres password=secret host=localhost"

def setup_schema(conn):
    with conn.cursor() as cur:
        cur.execute("DROP TABLE IF EXISTS products;")
        cur.execute("CREATE TABLE products (id SERIAL PRIMARY KEY, name TEXT, quantity_on_hand INT NOT NULL);")
        cur.execute("INSERT INTO products (name, quantity_on_hand) VALUES ('Flux Capacitor', 1);")
        conn.commit()

def process_order_with_locking(thread_id, product_id, quantity_to_purchase):
    conn = psycopg2.connect(DB_CONN_STR)
    conn.autocommit = False # Ensure we are in a transaction
    
    try:
        with conn.cursor() as cursor:
            print(f"[Thread {thread_id}] Attempting to lock product {product_id}...")
            
            # Set a timeout for acquiring the lock to avoid indefinite waits
            cursor.execute("SET LOCAL lock_timeout = '3s';")

            # Acquire an exclusive lock on the row
            cursor.execute(
                "SELECT quantity_on_hand FROM products WHERE id = %s FOR UPDATE;",
                (product_id,)
            )
            print(f"[Thread {thread_id}] Lock acquired.")
            current_stock = cursor.fetchone()[0]

            # Simulate some business logic processing time
            time.sleep(1)

            if current_stock >= quantity_to_purchase:
                new_stock = current_stock - quantity_to_purchase
                cursor.execute(
                    "UPDATE products SET quantity_on_hand = %s WHERE id = %s",
                    (new_stock, product_id)
                )
                conn.commit()
                print(f"[Thread {thread_id}] Order successful. New stock: {new_stock}")
            else:
                conn.rollback()
                print(f"[Thread {thread_id}] Insufficient stock. Rolling back.")

    except psycopg2.errors.LockNotAvailable:
        print(f"[Thread {thread_id}] Could not acquire lock in time. Order failed.")
        conn.rollback()
    except Exception as e:
        print(f"[Thread {thread_id}] An error occurred: {e}")
        conn.rollback()
    finally:
        conn.close()

# --- Simulation ---
if __name__ == "__main__":
    # Setup
    conn = psycopg2.connect(DB_CONN_STR)
    setup_schema(conn)
    conn.close()

    # Simulate two concurrent requests for the last item
    thread1 = threading.Thread(target=process_order_with_locking, args=(1, 1, 1))
    thread2 = threading.Thread(target=process_order_with_locking, args=(2, 1, 1))

    thread1.start()
    time.sleep(0.1) # Ensure thread1 starts first
    thread2.start()

    thread1.join()
    thread2.join()

    # Verify final state
    conn = psycopg2.connect(DB_CONN_STR)
    with conn.cursor() as cur:
        cur.execute("SELECT quantity_on_hand FROM products WHERE id = 1;")
        final_stock = cur.fetchone()[0]
        print(f"\nFinal stock in DB: {final_stock}")
        assert final_stock == 0
    conn.close()

Execution Analysis:

  • Thread 1 starts, begins a transaction, and executes SELECT ... FOR UPDATE. It acquires a FOR UPDATE lock on the row for product id=1.
    • Thread 2 starts, begins its own transaction, and executes the same query.
  • Because Thread 1 holds the lock, Thread 2's query blocks. It will wait until Thread 1 commits or rolls back.
    • Thread 1 completes its logic, updates the stock to 0, and commits. The lock is released.
  • Thread 2 immediately unblocks. Its SELECT ... FOR UPDATE query now executes and reads the new quantity_on_hand, which is 0. Its logic correctly determines there is insufficient stock and rolls back.
  • Data integrity is preserved.

    Advanced Considerations & Edge Cases

    * Deadlocks: The most significant danger with explicit locking is deadlocks. A deadlock occurs when two (or more) transactions are waiting for each other to release locks.

    * Scenario: Transaction A locks product_id=1 and then tries to lock product_id=2. Simultaneously, Transaction B locks product_id=2 and then tries to lock product_id=1. Neither can proceed.

    * Solution: The canonical solution is to enforce a consistent lock acquisition order across the entire application. For example, always lock rows by their primary key in ascending order. If you need to update products 1 and 2, always lock 1 first, then 2.

    * Detection: PostgreSQL has a deadlock detector that will run periodically. It will terminate one of the deadlocked transactions by raising an error, allowing the other to proceed.

    * Performance and lock_timeout: Waiting for a lock can be a major performance bottleneck. A long-running transaction holding a lock on a hot row can bring a critical part of your system to a halt.

    * It is crucial to set a lock_timeout. This statement, SET LOCAL lock_timeout = '2s';, configures the timeout for the current transaction only. If a lock cannot be acquired within 2 seconds, the statement will fail with a LockNotAvailable error, which your application must be prepared to handle gracefully (e.g., by returning an error to the user or retrying).

    * Lock Strength: FOR UPDATE is an exclusive lock. PostgreSQL also offers other lock strengths like FOR NO KEY UPDATE, FOR SHARE, and FOR KEY SHARE for more fine-grained control, which can improve concurrency if you don't need the strongest lock.


    2. The Work Queue Pattern with `FOR UPDATE SKIP LOCKED`

    While FOR UPDATE serializes access to a row, SKIP LOCKED does the opposite: it allows you to bypass locked rows entirely. This makes it an incredibly powerful tool for building high-performance, concurrent work queues directly in the database.

    Problem Scenario: A Bottlenecked Job Processor

    Imagine a jobs table where multiple background workers need to fetch and process pending tasks.

    sql
    CREATE TABLE jobs (
        id BIGSERIAL PRIMARY KEY,
        payload JSONB NOT NULL,
        status TEXT NOT NULL DEFAULT 'pending', -- 'pending', 'processing', 'completed', 'failed'
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        processed_at TIMESTAMPTZ
    );
    CREATE INDEX idx_jobs_pending_created_at ON jobs (created_at) WHERE status = 'pending';

    A naive worker might do:

    BEGIN;

    SELECT id, payload FROM jobs WHERE status = 'pending' ORDER BY created_at LIMIT 1 FOR UPDATE;

    -- ... process job ...

    UPDATE jobs SET status = 'completed' WHERE id = ...;

    COMMIT;

    This creates a massive bottleneck. If 10 workers start simultaneously, one gets the lock, and the other 9 wait patiently for it to finish. The throughput is effectively that of a single worker.

    The `SKIP LOCKED` Solution

    SKIP LOCKED tells PostgreSQL: "Find me the first row that matches my criteria that isn't already locked, and lock it." If Worker 1 locks job #1, Worker 2's query will simply skip job #1 and immediately lock job #2. This enables true parallel processing.

    Production-Grade Implementation:

    This pattern is most efficient when combined into a single, atomic UPDATE ... FROM SELECT statement using a common table expression (CTE).

    python
    import psycopg2
    import threading
    import time
    import uuid
    
    DB_CONN_STR = "dbname=test user=postgres password=secret host=localhost"
    
    def setup_queue_schema(conn):
        with conn.cursor() as cur:
            cur.execute("DROP TABLE IF EXISTS jobs;")
            cur.execute("""
                CREATE TABLE jobs (
                    id BIGSERIAL PRIMARY KEY,
                    payload JSONB NOT NULL,
                    status TEXT NOT NULL DEFAULT 'pending',
                    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
                    processed_at TIMESTAMPTZ
                );
            """)
            cur.execute("CREATE INDEX idx_jobs_pending ON jobs (created_at) WHERE status = 'pending';")
            # Add some jobs
            for i in range(20):
                cur.execute("INSERT INTO jobs (payload) VALUES (%s);", (f'{{"task_id": "{uuid.uuid4()}"}}',))
            conn.commit()
    
    # The core query for a worker to fetch and claim a job
    FETCH_AND_LOCK_JOB_SQL = """
    WITH next_job AS (
        SELECT id
        FROM jobs
        WHERE status = 'pending'
        ORDER BY created_at
        FOR UPDATE SKIP LOCKED
        LIMIT 1
    )
    UPDATE jobs
    SET status = 'processing', processed_at = NOW()
    WHERE id = (SELECT id FROM next_job)
    RETURNING id, payload;
    """
    
    def worker_process(worker_id):
        conn = psycopg2.connect(DB_CONN_STR)
        conn.autocommit = True # Use autocommit for single-statement transactions
    
        while True:
            with conn.cursor() as cursor:
                cursor.execute(FETCH_AND_LOCK_JOB_SQL)
                job = cursor.fetchone()
    
                if job:
                    job_id, payload = job
                    print(f"[Worker {worker_id}] Processing job {job_id} with payload: {payload}")
                    # Simulate work
                    time.sleep(0.5)
                    # In a real scenario, you'd update the job to 'completed' or 'failed' here
                else:
                    print(f"[Worker {worker_id}] No jobs left to process. Exiting.")
                    break
        conn.close()
    
    # --- Simulation ---
    if __name__ == "__main__":
        conn = psycopg2.connect(DB_CONN_STR)
        setup_queue_schema(conn)
        conn.close()
    
        num_workers = 5
        threads = []
        for i in range(num_workers):
            thread = threading.Thread(target=worker_process, args=(i + 1,))
            threads.append(thread)
            thread.start()
    
        for thread in threads:
            thread.join()
    
        print("\nAll workers finished.")

    Execution Analysis:

  • All 5 workers execute the FETCH_AND_LOCK_JOB_SQL query concurrently.
  • The database engine executes the CTE. Thanks to SKIP LOCKED, each worker's query will find and lock a different available row where status = 'pending'.
  • The outer UPDATE statement then atomically updates the status of the specific job that worker locked to 'processing'. The RETURNING clause sends the job data back to the application.
    • The result is near-perfect parallel execution. The workers do not block each other, and throughput scales linearly with the number of workers (up to the limits of the DB hardware).

    Advanced Considerations & Performance Tuning

    * Indexing is Critical: For SKIP LOCKED to be performant, PostgreSQL must be able to efficiently find the next unlocked row. The partial index ON jobs (created_at) WHERE status = 'pending' is crucial. It's a small, highly efficient index containing only the jobs that workers care about. Without it, Postgres might have to do a full table scan, which would be disastrous for performance.

    * When to use a DB Queue: This pattern is incredibly useful when your jobs are tightly coupled with your primary application data. It avoids the need for separate infrastructure (like RabbitMQ or SQS) and allows you to process jobs and update business data within the same transaction. However, for extremely high-throughput, decoupled messaging, a dedicated message queue is usually the better choice.

    * Job Timeouts and Retries: This simple example doesn't handle worker crashes. A worker could claim a job (setting status to 'processing') and then die. A robust implementation would require a separate "janitor" process that periodically scans for jobs that have been in the 'processing' state for too long and resets them to 'pending'. You could add a lease_expires_at timestamp column to manage this more explicitly.


    3. Application-Level Semaphores with Advisory Locks

    Sometimes, the resource you need to protect isn't a row in a table. It might be an external API, a file on disk, or a complex, long-running business process that shouldn't be executed concurrently for a given entity (like a customer or a tenant).

    This is where Advisory Locks excel. They are a cooperative locking mechanism managed entirely by the application. They are not tied to any table or row; instead, you lock against an arbitrary integer.

    Problem Scenario: Idempotent Data Aggregation

    Imagine a multi-tenant SaaS where you run a resource-intensive nightly report for each tenant. If, due to some scheduler glitch, two instances of the report job for the same tenant (tenant_id = 123) start at the same time, you waste significant resources and could potentially corrupt the aggregated data.

    The Advisory Lock Solution

    We can use the tenant_id as the key for an advisory lock. Before starting the heavy lifting, the job will attempt to acquire a lock for its tenant_id. If it succeeds, it proceeds. If it fails, it means another process is already running the report for that tenant, and it can exit gracefully.

    Production-Grade Implementation:

    PostgreSQL provides two main types of advisory locks: session-level and transaction-level. Transaction-level locks (pg_advisory_xact_lock) are generally safer, as they are automatically released when the transaction ends (commits or rolls back), preventing locks from being stuck if a client crashes.

    python
    import psycopg2
    import threading
    import time
    import hashlib
    
    DB_CONN_STR = "dbname=test user=postgres password=secret host=localhost"
    
    # Advisory locks work on 64-bit integers. If your key is not an integer
    # (e.g., a UUID or string), hash it consistently.
    def get_lock_key_from_string(key_str):
        # Use a non-cryptographic hash for speed, like xxhash, or sha1 for consistency.
        # We only need the first 8 bytes for a 64-bit integer.
        return int.from_bytes(hashlib.sha1(key_str.encode('utf-8')).digest()[:8], 'big', signed=True)
    
    def run_nightly_report(worker_id, tenant_id_str):
        conn = psycopg2.connect(DB_CONN_STR)
        conn.autocommit = False # Must be in a transaction for xact locks
    
        lock_key = get_lock_key_from_string(tenant_id_str)
    
        try:
            with conn.cursor() as cursor:
                print(f"[Worker {worker_id}] Attempting to acquire lock for tenant '{tenant_id_str}' (key: {lock_key})")
                
                # pg_try_advisory_xact_lock is non-blocking. It returns true if the lock
                # was acquired, false otherwise.
                cursor.execute("SELECT pg_try_advisory_xact_lock(%s);", (lock_key,))
                lock_acquired = cursor.fetchone()[0]
    
                if lock_acquired:
                    print(f"[Worker {worker_id}] Lock acquired. Running intensive report...")
                    # Simulate a long-running, complex task
                    time.sleep(3)
                    print(f"[Worker {worker_id}] Report for tenant '{tenant_id_str}' complete.")
                    conn.commit() # This commit also releases the advisory lock
                else:
                    print(f"[Worker {worker_id}] Could not acquire lock. Another process is running. Exiting.")
                    # No need to rollback if we didn't do anything, but it's good practice
                    conn.rollback()
    
        except Exception as e:
            print(f"[Worker {worker_id}] An error occurred: {e}")
            conn.rollback()
        finally:
            conn.close()
    
    # --- Simulation ---
    if __name__ == "__main__":
        tenant_to_process = "tenant-abc-123"
    
        # Simulate two jobs starting at the same time for the same tenant
        thread1 = threading.Thread(target=run_nightly_report, args=(1, tenant_to_process))
        thread2 = threading.Thread(target=run_nightly_report, args=(2, tenant_to_process))
    
        thread1.start()
        time.sleep(0.1)
        thread2.start()
    
        thread1.join()
        thread2.join()
    
        print("\nSimulation finished.")

    Execution Analysis:

  • Worker 1 starts, begins a transaction, and calls pg_try_advisory_xact_lock with the hashed tenant ID. It succeeds and receives true.
    • Worker 1 proceeds to run the long report.
  • Worker 2 starts and calls pg_try_advisory_xact_lock with the same key. Because the lock is already held by Worker 1's transaction, the function immediately returns false.
  • Worker 2 sees the false result, logs a message, and exits gracefully without doing any work.
    • Worker 1 eventually finishes its report and commits the transaction. The commit automatically releases the advisory lock.

    This pattern provides a simple and robust distributed mutex without any external dependencies like Redis or Zookeeper.

    Advanced Patterns & Caveats

    * Key Space Management: The 64-bit integer key space is vast but finite. It's critical to have a deterministic strategy for mapping your logical resources (like tenant IDs) to lock keys to avoid collisions. Using a salted hash is a good practice.

    * Session vs. Transaction Locks: pg_advisory_lock (a session-level lock) persists until explicitly unlocked with pg_advisory_unlock or until the database connection is closed. This is dangerous. If your application worker crashes without unlocking, the lock is orphaned and will remain until the database connection times out or is manually cleared. Prefer transaction-level locks (pg_advisory_xact_lock) for most use cases, as they are automatically cleaned up.

    * Blocking vs. Non-Blocking: The examples use pg_try_advisory... which is non-blocking. The alternative, pg_advisory..., will block and wait for the lock to become available, similar to FOR UPDATE. The try variant is often better for application-level locks, as it allows the application to react immediately if the resource is busy.


    4. Monitoring and Debugging Locking in Production

    When your system is under load and requests are timing out, your first suspect should be lock contention. Debugging locks can be difficult because they are transient and internal to the database state. PostgreSQL provides two essential views for this.

    * pg_locks: Shows detailed information about all active locks.

    * pg_stat_activity: Shows all active connections and their current queries.

    By joining them, you can build a powerful query to find out exactly who is blocking whom.

    The Ultimate Lock Debugging Query:

    sql
    SELECT
        blocked_locks.pid     AS blocked_pid,
        blocked_activity.usename  AS blocked_user,
        blocked_activity.query    AS blocked_query,
        blocking_locks.pid     AS blocking_pid,
        blocking_activity.usename AS blocking_user,
        blocking_activity.query   AS blocking_query,
        age(now(), blocked_activity.query_start) AS blocked_duration
    FROM  pg_catalog.pg_locks         blocked_locks
    JOIN  pg_catalog.pg_stat_activity blocked_activity  ON blocked_locks.pid = blocked_activity.pid
    JOIN  pg_catalog.pg_locks         blocking_locks 
        ON blocking_locks.locktype = blocked_locks.locktype
        AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
        AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
        AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
        AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
        AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
        AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
        AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
        AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
        AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
        AND blocking_locks.pid != blocked_locks.pid
    JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_locks.pid = blocking_activity.pid
    WHERE NOT blocked_locks.granted;

    Running this query when your application is experiencing slowdowns will give you a clear picture:

    * blocked_query: The SQL statement that is currently stuck waiting for a lock.

    * blocking_query: The SQL statement that is holding the lock the other query is waiting for.

    * blocked_duration: How long the query has been waiting.

    This information is invaluable. It can immediately point you to a long-running transaction in your application that needs to be optimized or a deadlock scenario that needs to be resolved by fixing the lock acquisition order.

    Conclusion

    Explicit locking in PostgreSQL is a powerful feature set that moves beyond textbook transaction theory into the practical reality of building high-concurrency systems.

    * SELECT ... FOR UPDATE is your tool for ensuring atomic read-modify-write cycles on specific rows, preventing race conditions at the cost of serialization.

    * FOR UPDATE SKIP LOCKED is the key to unlocking massive parallelism for database-backed work queues, allowing you to build scalable, high-throughput background processing systems.

    * Advisory Locks provide a flexible, application-driven mutex for protecting logical resources that don't map cleanly to a table row.

    These are not tools to be used lightly. They require careful consideration of performance, deadlock avoidance, and robust application-level error handling. But for a senior engineer tasked with building systems that must remain correct and performant under pressure, they are an essential part of the arsenal.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles