PostgreSQL Advisory Locks for Resilient Distributed Task Scheduling

22 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Distributed Coordination Problem in Plain Sight

In any horizontally-scaled application architecture, a common and deceptively complex problem arises: how do you ensure a specific task runs exactly once across a fleet of identical instances? Whether it's a nightly data aggregation job, a periodic cache warmer, or a background task processor, preventing multiple workers from performing the same unit of work simultaneously is critical for correctness and efficiency.

The conventional playbook often involves reaching for external coordination services. A Redis-based distributed lock (like Redlock), a Zookeeper latch, or an etcd lease are all robust, well-understood solutions. However, they introduce significant operational overhead: another service to deploy, monitor, secure, and scale. For many systems, the primary database—often PostgreSQL—is already the single source of truth and the most resilient component. What if we could solve this complex distributed systems problem using the tools it already provides?

This is where PostgreSQL's advisory locks enter the picture. They are a powerful, lightweight, and database-native mechanism for building distributed mutexes. This article is not an introduction; it is a deep dive into a production-grade pattern for implementing a resilient distributed job queue using advisory locks. We will dissect the implementation details, confront difficult edge cases, and analyze the performance characteristics of this approach.

A Refresher: Why Advisory Locks Differ from MVCC

Before we build, it's crucial to solidify our understanding of what makes advisory locks unique. Unlike PostgreSQL's standard locking mechanisms (row-level, page-level, table-level locks) which are implicitly managed by the MVCC system to ensure transaction isolation and data integrity, advisory locks are explicit and cooperative.

  • Application-Defined Scope: They don't lock a physical database object like a row or table. They lock an arbitrary integer (a 64-bit signed integer, to be precise) that you, the application developer, define. This means you can create a lock that represents a conceptual resource, like "the-ability-to-run-nightly-billing" or "processing-job-42", without needing a corresponding table row.
  • Cooperative, Not Mandatory: The database does not enforce them automatically. All participating application instances must agree to use the same locking protocol (i.e., attempt to acquire a lock on the same integer key before proceeding).
  • Session vs. Transaction Scopes: This is the most critical distinction for our use case.
  • - pg_advisory_xact_lock(key): The lock is held until the current transaction ends (either COMMIT or ROLLBACK). This is useful for short-lived, atomic operations but is unsuitable for long-running background jobs, as it would hold the transaction open for the entire duration.

    - pg_advisory_lock(key): The lock is held until the session (database connection) ends or it is explicitly released with pg_advisory_unlock(key). This is the cornerstone of our pattern, as it allows a worker to hold a lock for a long-running task without a long-lived transaction, and—most importantly—the lock is automatically released if the client connection dies. This property provides a built-in cleanup mechanism for crashed workers.

    Pattern 1: The Idempotent Singleton Cron Job

    Let's start with the simplest real-world scenario: a scheduled task that must run every hour, executed by a fleet of application servers. We need to elect a single leader for each run.

    We can define a unique integer key for our task. A common practice is to use a non-colliding hash of a descriptive string.

    python
    import hashlib
    import struct
    
    # Use a well-known string for the task name
    TASK_NAME = "hourly_report_generation"
    
    def get_lock_key(task_name: str) -> int:
        """Generates a 64-bit signed integer key from a string."""
        # Use SHA-256 for a good distribution, then take the first 8 bytes
        hashed = hashlib.sha256(task_name.encode('utf-8')).digest()
        # Unpack as a signed 64-bit integer ('q' format code)
        key, = struct.unpack('q', hashed[:8])
        return key
    
    # Our specific lock key for this cron job
    HOURLY_REPORT_LOCK_KEY = get_lock_key(TASK_NAME)

    Now, let's implement the worker logic. We'll use the non-blocking pg_try_advisory_lock() function. It attempts to acquire the lock immediately and returns True on success or False if the lock is already held by another session.

    python
    import psycopg2
    import time
    import logging
    
    # --- (get_lock_key function from above) ---
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    DB_CONN_STRING = "dbname=mydb user=myuser password=mypass host=localhost"
    HOURLY_REPORT_LOCK_KEY = get_lock_key("hourly_report_generation")
    
    def run_hourly_report():
        """Simulates a long-running report generation task."""
        logging.info("Starting hourly report generation...")
        time.sleep(30) # Simulate 30 seconds of work
        logging.info("Hourly report generation complete.")
    
    def attempt_to_run_task():
        conn = None
        try:
            conn = psycopg2.connect(DB_CONN_STRING)
            cursor = conn.cursor()
    
            # Attempt to acquire the session-level lock
            cursor.execute("SELECT pg_try_advisory_lock(%s)", (HOURLY_REPORT_LOCK_KEY,))
            lock_acquired = cursor.fetchone()[0]
    
            if lock_acquired:
                logging.info(f"Lock {HOURLY_REPORT_LOCK_KEY} acquired. I am the leader.")
                try:
                    run_hourly_report()
                finally:
                    # CRITICAL: Always release the lock
                    logging.info(f"Releasing lock {HOURLY_REPORT_LOCK_KEY}.")
                    cursor.execute("SELECT pg_advisory_unlock(%s)", (HOURLY_REPORT_LOCK_KEY,))
                    conn.commit() # The unlock is part of the transaction
            else:
                logging.info(f"Could not acquire lock {HOURLY_REPORT_LOCK_KEY}. Another instance is running.")
    
        except Exception as e:
            logging.error(f"An error occurred: {e}")
        finally:
            if conn:
                conn.close()
    
    if __name__ == "__main__":
        # In a real system, this would be triggered by a scheduler like cron or systemd timer
        attempt_to_run_task()

    If you run this script in two separate terminals simultaneously, you'll see one acquire the lock and perform the work, while the other immediately logs that the lock is held and exits gracefully. This simple pattern effectively elects a leader for the duration of the task.

    Pattern 2: A Resilient Distributed Job Queue

    The singleton cron job is a good start, but most real-world systems need to process a queue of discrete jobs. This introduces more complexity: multiple workers should be able to process different jobs concurrently, but any single job must be processed by only one worker.

    First, let's define our jobs table schema:

    sql
    CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'failed');
    
    CREATE TABLE jobs (
        id BIGSERIAL PRIMARY KEY,
        task_name TEXT NOT NULL,
        payload JSONB DEFAULT '{}',
        status job_status NOT NULL DEFAULT 'pending',
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        scheduled_for TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        started_at TIMESTAMPTZ,
        completed_at TIMESTAMPTZ,
        worker_id TEXT
    );
    
    CREATE INDEX idx_jobs_pending ON jobs (scheduled_for, id) WHERE status = 'pending';

    The naive approach would be to have workers run SELECT id FROM jobs WHERE status = 'pending' LIMIT 1, and then try to UPDATE that job to running. This is a classic race condition waiting to happen. The standard solution is SELECT ... FOR UPDATE SKIP LOCKED, which is an excellent and highly performant pattern.

    However, we will implement this using advisory locks to demonstrate a different set of trade-offs and capabilities. This pattern particularly shines when the "lock" needs to outlive a transaction or when the resource being locked is more abstract than a single row.

    Our strategy will be:

  • A worker queries for a batch of pending jobs.
    • It iterates through the candidate jobs in-memory.
  • For each candidate job ID, it attempts to acquire an advisory lock using the job's primary key (id).
  • The Double-Check: If the lock is acquired, it must re-query the database to confirm the job's status is still pending. This is the crucial step to prevent a specific race condition.
  • If the double-check passes, the worker updates the job row to running, associating its own worker_id.
    • The worker processes the job.
  • Finally, it updates the job to completed or failed and releases the advisory lock.
  • The Race Condition We're Preventing

    Why is the double-check necessary? Consider this sequence of events:

  • Worker A: SELECT id, status FROM jobs WHERE status = 'pending' LIMIT 10; -> Gets jobs [1, 2, 3].
  • Worker B: SELECT id, status FROM jobs WHERE status = 'pending' LIMIT 10; -> Also gets jobs [1, 2, 3].
  • Worker B: Iterates to job 1, calls pg_try_advisory_lock(1) and succeeds.
  • Worker B: Updates job 1 to running, processes it, updates it to completed, and releases the lock.
  • Worker A: Now iterates to job 1 (from its stale, in-memory list). It calls pg_try_advisory_lock(1) and succeeds, because Worker B has already released it.
  • Without the double-check, Worker A would now proceed to process a job that is already completed, leading to redundant work and potential data corruption.

    Production-Grade Worker Implementation

    Here is a more complete, asynchronous implementation using Python's asyncio and asyncpg for better I/O performance.

    python
    import asyncio
    import asyncpg
    import logging
    import uuid
    import json
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(worker_id)s - %(message)s')
    
    DB_POOL_CONFIG = {
        'dsn': "postgres://myuser:mypass@localhost/mydb"
    }
    
    class JobWorker:
        def __init__(self, pool, worker_id):
            self.pool = pool
            self.worker_id = worker_id
            self.logger = logging.LoggerAdapter(logging.getLogger(), {'worker_id': self.worker_id})
    
        async def process_job_payload(self, job_id, payload):
            self.logger.info(f"Starting processing for job {job_id} with payload: {payload}")
            # Simulate I/O bound work
            await asyncio.sleep(5)
            if payload.get("should_fail", False):
                raise ValueError("This job was configured to fail.")
            self.logger.info(f"Finished processing for job {job_id}")
    
        async def find_and_process_job(self):
            async with self.pool.acquire() as conn:
                # 1. Find candidate jobs
                candidate_jobs = await conn.fetch("""
                    SELECT id FROM jobs
                    WHERE status = 'pending' AND scheduled_for <= NOW()
                    ORDER BY scheduled_for, id
                    LIMIT 10
                """)
    
                if not candidate_jobs:
                    return False # No jobs found
    
                self.logger.info(f"Found {len(candidate_jobs)} candidate jobs.")
    
                for job_record in candidate_jobs:
                    job_id = job_record['id']
                    lock_acquired = False
                    try:
                        # 2. Attempt to acquire the lock for this specific job ID
                        lock_acquired = await conn.fetchval("SELECT pg_try_advisory_lock($1)", job_id)
    
                        if not lock_acquired:
                            self.logger.info(f"Could not acquire lock for job {job_id}, skipping.")
                            continue
    
                        self.logger.info(f"Acquired lock for job {job_id}.")
    
                        # 3. The CRITICAL Double-Check
                        job_status = await conn.fetchval("SELECT status FROM jobs WHERE id = $1", job_id)
                        if job_status != 'pending':
                            self.logger.warning(f"Job {job_id} status changed to '{job_status}' after lock acquisition. Skipping.")
                            # The lock will be released in the finally block
                            continue
                        
                        # 4. We are the confirmed owner. Mark the job as running.
                        await conn.execute("""
                            UPDATE jobs
                            SET status = 'running', started_at = NOW(), worker_id = $1
                            WHERE id = $2
                        """, self.worker_id, job_id)
                        
                        job_payload = await conn.fetchval("SELECT payload FROM jobs WHERE id = $1", job_id)
    
                        # 5. Process the job
                        try:
                            await self.process_job_payload(job_id, json.loads(job_payload or '{}'))
                            # 6a. Mark as completed
                            await conn.execute("""
                                UPDATE jobs
                                SET status = 'completed', completed_at = NOW()
                                WHERE id = $1
                            """, job_id)
                            self.logger.info(f"Job {job_id} marked as completed.")
                        except Exception as e:
                            self.logger.error(f"Error processing job {job_id}: {e}")
                            # 6b. Mark as failed
                            await conn.execute("""
                                UPDATE jobs
                                SET status = 'failed', completed_at = NOW()
                                WHERE id = $1
                            """, job_id)
                        
                        return True # Processed one job, exit the loop
    
                    finally:
                        if lock_acquired:
                            await conn.execute("SELECT pg_advisory_unlock($1)", job_id)
                            self.logger.info(f"Released lock for job {job_id}.")
            return False # No job was successfully locked and processed
    
        async def run(self):
            self.logger.info("Worker started. Polling for jobs...")
            while True:
                try:
                    processed = await self.find_and_process_job()
                    if not processed:
                        await asyncio.sleep(2) # Poll interval
                except Exception as e:
                    self.logger.error(f"Unhandled error in worker loop: {e}")
                    await asyncio.sleep(5) # Longer backoff on major error
    
    async def main():
        pool = await asyncpg.create_pool(**DB_POOL_CONFIG)
        
        # Simulate adding some jobs to the queue
        async with pool.acquire() as conn:
            await conn.execute("TRUNCATE jobs RESTART IDENTITY")
            await conn.execute("INSERT INTO jobs (task_name, payload) VALUES ($1, $2)", 'send_email', '{"to": "[email protected]"}')
            await conn.execute("INSERT INTO jobs (task_name, payload) VALUES ($1, $2)", 'generate_report', '{"report_id": 123}')
            await conn.execute("INSERT INTO jobs (task_name, payload) VALUES ($1, $2)", 'process_payment', '{"should_fail": true}')
    
        # Run multiple workers concurrently
        workers = [JobWorker(pool, f"worker-{uuid.uuid4().hex[:6]}") for _ in range(3)]
        await asyncio.gather(*(w.run() for w in workers))
    
    if __name__ == "__main__":
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print("\nShutting down workers.")

    Edge Cases and Production Hardening

    A working implementation is one thing; a production-ready one requires handling the inevitable failures.

    Stale Locks and Dead Workers

    This is where session-level advisory locks are truly superior to application-level solutions. If a worker process acquires a lock and then crashes, segfaults, or is terminated by a SIGKILL, it never gets to run its finally block to release the lock.

    Because we used pg_advisory_lock, the lock is tied to the database connection. When the worker process dies, its TCP connection to the PostgreSQL server will eventually be terminated. The PostgreSQL postmaster process will clean up the dead backend process, and as part of that cleanup, it will release all session-level advisory locks held by that connection.

    The key word here is "eventually." The speed of this detection depends on OS-level TCP keepalive settings and PostgreSQL's own timeout parameters (like tcp_keepalives_idle, tcp_keepalives_interval). It's crucial to tune these to a value that balances network overhead with the desired speed of stale lock cleanup. A timeout of 60-120 seconds is a common starting point.

    This gives us a robust, automatic cleanup mechanism for stale running jobs. You can build a separate reaper process that periodically looks for jobs that have been in the running state for too long (e.g., started_at < NOW() - INTERVAL '1 hour') and requeues them by setting their status back to pending.

    Monitoring and Observability

    How do you debug a system like this? PostgreSQL provides an indispensable view: pg_locks.

    You can query this view to see exactly which advisory locks are currently being held, and by which process.

    sql
    SELECT 
        locktype, 
        (classid::bigint << 32) | objid::bigint AS lock_key, 
        pid AS process_id,
        mode,
        granted,
        waitstart
    FROM pg_locks 
    WHERE locktype = 'advisory';

    This query is your primary tool for answering questions like:

  • "Which jobs are currently being processed?" (The lock_key will correspond to the id of the job).
  • "Is a worker stuck holding a lock?" (You can join pg_locks with pg_stat_activity using the pid to see the query and state of the connection holding the lock).
  • Lock Key Space Management

    Advisory locks can be a single bigint or a pair of integers. Using the two-part version, pg_try_advisory_lock(class_id, object_id), is an excellent pattern for namespacing your locks. For example, you could use a class_id to represent the type of resource being locked (e.g., 1 for 'jobs', 2 for 'reports') and the object_id for the specific instance. This virtually eliminates the risk of key collisions between different subsystems in your application.

    Performance: Advisory Locks vs. `SELECT ... FOR UPDATE SKIP LOCKED`

    It's important to be objective about where this pattern fits. For a high-throughput, homogeneous job queue (many workers, many available jobs), SELECT ... FOR UPDATE SKIP LOCKED is often the more performant choice. It's a single, atomic, index-driven operation that finds and locks available rows directly in the database engine. It is purpose-built for this exact problem.

    The advisory lock pattern involves more round trips and application-side logic (fetch candidates, loop, try-lock, double-check). So where does it excel?

  • Long-Running Jobs: SKIP LOCKED places a row lock that is typically tied to a transaction. For a job that runs for minutes or hours, holding a transaction open is highly problematic (it can cause VACUUM issues and bloat). The session-level advisory lock decouples the lock's lifetime from the transaction's lifetime, which is a massive advantage.
  • Heterogeneous or Contended Tasks: In the singleton cron job example, where many workers are contending for a single resource, the pg_try_advisory_lock approach is extremely low-contention. Workers that fail to get the lock return immediately without waiting or blocking others.
  • Abstract Resource Locking: When the resource you need to lock doesn't map cleanly to a single database row, advisory locks are the perfect tool. Imagine needing to ensure only one worker is performing a complex data sync with a third-party API; an advisory lock on a key representing that API endpoint is a clean and effective solution.
  • Conclusion: A Pragmatic Tool for Distributed Coordination

    PostgreSQL advisory locks are not a silver bullet, but they are a remarkably powerful and pragmatic tool in the senior engineer's arsenal. By leveraging the database you already depend on, you can build surprisingly resilient and robust distributed systems—from singleton task schedulers to complex job queues—without introducing the complexity and operational burden of external coordination services.

    The key is to move beyond the basic function call and deeply understand the implementation patterns and trade-offs. By embracing session-level locks for their automatic cleanup properties, implementing the critical double-check pattern to prevent race conditions, and using pg_locks for observability, you can confidently deploy this pattern to solve real-world distributed coordination problems in production.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles