PostgreSQL Advisory Locks for Resilient Distributed Task Scheduling
The Distributed Coordination Problem in Plain Sight
In any horizontally-scaled application architecture, a common and deceptively complex problem arises: how do you ensure a specific task runs exactly once across a fleet of identical instances? Whether it's a nightly data aggregation job, a periodic cache warmer, or a background task processor, preventing multiple workers from performing the same unit of work simultaneously is critical for correctness and efficiency.
The conventional playbook often involves reaching for external coordination services. A Redis-based distributed lock (like Redlock), a Zookeeper latch, or an etcd lease are all robust, well-understood solutions. However, they introduce significant operational overhead: another service to deploy, monitor, secure, and scale. For many systems, the primary database—often PostgreSQL—is already the single source of truth and the most resilient component. What if we could solve this complex distributed systems problem using the tools it already provides?
This is where PostgreSQL's advisory locks enter the picture. They are a powerful, lightweight, and database-native mechanism for building distributed mutexes. This article is not an introduction; it is a deep dive into a production-grade pattern for implementing a resilient distributed job queue using advisory locks. We will dissect the implementation details, confront difficult edge cases, and analyze the performance characteristics of this approach.
A Refresher: Why Advisory Locks Differ from MVCC
Before we build, it's crucial to solidify our understanding of what makes advisory locks unique. Unlike PostgreSQL's standard locking mechanisms (row-level, page-level, table-level locks) which are implicitly managed by the MVCC system to ensure transaction isolation and data integrity, advisory locks are explicit and cooperative.
- pg_advisory_xact_lock(key): The lock is held until the current transaction ends (either COMMIT or ROLLBACK). This is useful for short-lived, atomic operations but is unsuitable for long-running background jobs, as it would hold the transaction open for the entire duration.
- pg_advisory_lock(key): The lock is held until the session (database connection) ends or it is explicitly released with pg_advisory_unlock(key). This is the cornerstone of our pattern, as it allows a worker to hold a lock for a long-running task without a long-lived transaction, and—most importantly—the lock is automatically released if the client connection dies. This property provides a built-in cleanup mechanism for crashed workers.
Pattern 1: The Idempotent Singleton Cron Job
Let's start with the simplest real-world scenario: a scheduled task that must run every hour, executed by a fleet of application servers. We need to elect a single leader for each run.
We can define a unique integer key for our task. A common practice is to use a non-colliding hash of a descriptive string.
import hashlib
import struct
# Use a well-known string for the task name
TASK_NAME = "hourly_report_generation"
def get_lock_key(task_name: str) -> int:
"""Generates a 64-bit signed integer key from a string."""
# Use SHA-256 for a good distribution, then take the first 8 bytes
hashed = hashlib.sha256(task_name.encode('utf-8')).digest()
# Unpack as a signed 64-bit integer ('q' format code)
key, = struct.unpack('q', hashed[:8])
return key
# Our specific lock key for this cron job
HOURLY_REPORT_LOCK_KEY = get_lock_key(TASK_NAME)
Now, let's implement the worker logic. We'll use the non-blocking pg_try_advisory_lock() function. It attempts to acquire the lock immediately and returns True on success or False if the lock is already held by another session.
import psycopg2
import time
import logging
# --- (get_lock_key function from above) ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
DB_CONN_STRING = "dbname=mydb user=myuser password=mypass host=localhost"
HOURLY_REPORT_LOCK_KEY = get_lock_key("hourly_report_generation")
def run_hourly_report():
"""Simulates a long-running report generation task."""
logging.info("Starting hourly report generation...")
time.sleep(30) # Simulate 30 seconds of work
logging.info("Hourly report generation complete.")
def attempt_to_run_task():
conn = None
try:
conn = psycopg2.connect(DB_CONN_STRING)
cursor = conn.cursor()
# Attempt to acquire the session-level lock
cursor.execute("SELECT pg_try_advisory_lock(%s)", (HOURLY_REPORT_LOCK_KEY,))
lock_acquired = cursor.fetchone()[0]
if lock_acquired:
logging.info(f"Lock {HOURLY_REPORT_LOCK_KEY} acquired. I am the leader.")
try:
run_hourly_report()
finally:
# CRITICAL: Always release the lock
logging.info(f"Releasing lock {HOURLY_REPORT_LOCK_KEY}.")
cursor.execute("SELECT pg_advisory_unlock(%s)", (HOURLY_REPORT_LOCK_KEY,))
conn.commit() # The unlock is part of the transaction
else:
logging.info(f"Could not acquire lock {HOURLY_REPORT_LOCK_KEY}. Another instance is running.")
except Exception as e:
logging.error(f"An error occurred: {e}")
finally:
if conn:
conn.close()
if __name__ == "__main__":
# In a real system, this would be triggered by a scheduler like cron or systemd timer
attempt_to_run_task()
If you run this script in two separate terminals simultaneously, you'll see one acquire the lock and perform the work, while the other immediately logs that the lock is held and exits gracefully. This simple pattern effectively elects a leader for the duration of the task.
Pattern 2: A Resilient Distributed Job Queue
The singleton cron job is a good start, but most real-world systems need to process a queue of discrete jobs. This introduces more complexity: multiple workers should be able to process different jobs concurrently, but any single job must be processed by only one worker.
First, let's define our jobs table schema:
CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'failed');
CREATE TABLE jobs (
id BIGSERIAL PRIMARY KEY,
task_name TEXT NOT NULL,
payload JSONB DEFAULT '{}',
status job_status NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
scheduled_for TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
worker_id TEXT
);
CREATE INDEX idx_jobs_pending ON jobs (scheduled_for, id) WHERE status = 'pending';
The naive approach would be to have workers run SELECT id FROM jobs WHERE status = 'pending' LIMIT 1, and then try to UPDATE that job to running. This is a classic race condition waiting to happen. The standard solution is SELECT ... FOR UPDATE SKIP LOCKED, which is an excellent and highly performant pattern.
However, we will implement this using advisory locks to demonstrate a different set of trade-offs and capabilities. This pattern particularly shines when the "lock" needs to outlive a transaction or when the resource being locked is more abstract than a single row.
Our strategy will be:
pending jobs.- It iterates through the candidate jobs in-memory.
id).pending. This is the crucial step to prevent a specific race condition.running, associating its own worker_id.- The worker processes the job.
completed or failed and releases the advisory lock.The Race Condition We're Preventing
Why is the double-check necessary? Consider this sequence of events:
SELECT id, status FROM jobs WHERE status = 'pending' LIMIT 10; -> Gets jobs [1, 2, 3].SELECT id, status FROM jobs WHERE status = 'pending' LIMIT 10; -> Also gets jobs [1, 2, 3].pg_try_advisory_lock(1) and succeeds.running, processes it, updates it to completed, and releases the lock.pg_try_advisory_lock(1) and succeeds, because Worker B has already released it.Without the double-check, Worker A would now proceed to process a job that is already completed, leading to redundant work and potential data corruption.
Production-Grade Worker Implementation
Here is a more complete, asynchronous implementation using Python's asyncio and asyncpg for better I/O performance.
import asyncio
import asyncpg
import logging
import uuid
import json
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(worker_id)s - %(message)s')
DB_POOL_CONFIG = {
'dsn': "postgres://myuser:mypass@localhost/mydb"
}
class JobWorker:
def __init__(self, pool, worker_id):
self.pool = pool
self.worker_id = worker_id
self.logger = logging.LoggerAdapter(logging.getLogger(), {'worker_id': self.worker_id})
async def process_job_payload(self, job_id, payload):
self.logger.info(f"Starting processing for job {job_id} with payload: {payload}")
# Simulate I/O bound work
await asyncio.sleep(5)
if payload.get("should_fail", False):
raise ValueError("This job was configured to fail.")
self.logger.info(f"Finished processing for job {job_id}")
async def find_and_process_job(self):
async with self.pool.acquire() as conn:
# 1. Find candidate jobs
candidate_jobs = await conn.fetch("""
SELECT id FROM jobs
WHERE status = 'pending' AND scheduled_for <= NOW()
ORDER BY scheduled_for, id
LIMIT 10
""")
if not candidate_jobs:
return False # No jobs found
self.logger.info(f"Found {len(candidate_jobs)} candidate jobs.")
for job_record in candidate_jobs:
job_id = job_record['id']
lock_acquired = False
try:
# 2. Attempt to acquire the lock for this specific job ID
lock_acquired = await conn.fetchval("SELECT pg_try_advisory_lock($1)", job_id)
if not lock_acquired:
self.logger.info(f"Could not acquire lock for job {job_id}, skipping.")
continue
self.logger.info(f"Acquired lock for job {job_id}.")
# 3. The CRITICAL Double-Check
job_status = await conn.fetchval("SELECT status FROM jobs WHERE id = $1", job_id)
if job_status != 'pending':
self.logger.warning(f"Job {job_id} status changed to '{job_status}' after lock acquisition. Skipping.")
# The lock will be released in the finally block
continue
# 4. We are the confirmed owner. Mark the job as running.
await conn.execute("""
UPDATE jobs
SET status = 'running', started_at = NOW(), worker_id = $1
WHERE id = $2
""", self.worker_id, job_id)
job_payload = await conn.fetchval("SELECT payload FROM jobs WHERE id = $1", job_id)
# 5. Process the job
try:
await self.process_job_payload(job_id, json.loads(job_payload or '{}'))
# 6a. Mark as completed
await conn.execute("""
UPDATE jobs
SET status = 'completed', completed_at = NOW()
WHERE id = $1
""", job_id)
self.logger.info(f"Job {job_id} marked as completed.")
except Exception as e:
self.logger.error(f"Error processing job {job_id}: {e}")
# 6b. Mark as failed
await conn.execute("""
UPDATE jobs
SET status = 'failed', completed_at = NOW()
WHERE id = $1
""", job_id)
return True # Processed one job, exit the loop
finally:
if lock_acquired:
await conn.execute("SELECT pg_advisory_unlock($1)", job_id)
self.logger.info(f"Released lock for job {job_id}.")
return False # No job was successfully locked and processed
async def run(self):
self.logger.info("Worker started. Polling for jobs...")
while True:
try:
processed = await self.find_and_process_job()
if not processed:
await asyncio.sleep(2) # Poll interval
except Exception as e:
self.logger.error(f"Unhandled error in worker loop: {e}")
await asyncio.sleep(5) # Longer backoff on major error
async def main():
pool = await asyncpg.create_pool(**DB_POOL_CONFIG)
# Simulate adding some jobs to the queue
async with pool.acquire() as conn:
await conn.execute("TRUNCATE jobs RESTART IDENTITY")
await conn.execute("INSERT INTO jobs (task_name, payload) VALUES ($1, $2)", 'send_email', '{"to": "[email protected]"}')
await conn.execute("INSERT INTO jobs (task_name, payload) VALUES ($1, $2)", 'generate_report', '{"report_id": 123}')
await conn.execute("INSERT INTO jobs (task_name, payload) VALUES ($1, $2)", 'process_payment', '{"should_fail": true}')
# Run multiple workers concurrently
workers = [JobWorker(pool, f"worker-{uuid.uuid4().hex[:6]}") for _ in range(3)]
await asyncio.gather(*(w.run() for w in workers))
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutting down workers.")
Edge Cases and Production Hardening
A working implementation is one thing; a production-ready one requires handling the inevitable failures.
Stale Locks and Dead Workers
This is where session-level advisory locks are truly superior to application-level solutions. If a worker process acquires a lock and then crashes, segfaults, or is terminated by a SIGKILL, it never gets to run its finally block to release the lock.
Because we used pg_advisory_lock, the lock is tied to the database connection. When the worker process dies, its TCP connection to the PostgreSQL server will eventually be terminated. The PostgreSQL postmaster process will clean up the dead backend process, and as part of that cleanup, it will release all session-level advisory locks held by that connection.
The key word here is "eventually." The speed of this detection depends on OS-level TCP keepalive settings and PostgreSQL's own timeout parameters (like tcp_keepalives_idle, tcp_keepalives_interval). It's crucial to tune these to a value that balances network overhead with the desired speed of stale lock cleanup. A timeout of 60-120 seconds is a common starting point.
This gives us a robust, automatic cleanup mechanism for stale running jobs. You can build a separate reaper process that periodically looks for jobs that have been in the running state for too long (e.g., started_at < NOW() - INTERVAL '1 hour') and requeues them by setting their status back to pending.
Monitoring and Observability
How do you debug a system like this? PostgreSQL provides an indispensable view: pg_locks.
You can query this view to see exactly which advisory locks are currently being held, and by which process.
SELECT
locktype,
(classid::bigint << 32) | objid::bigint AS lock_key,
pid AS process_id,
mode,
granted,
waitstart
FROM pg_locks
WHERE locktype = 'advisory';
This query is your primary tool for answering questions like:
lock_key will correspond to the id of the job).pg_locks with pg_stat_activity using the pid to see the query and state of the connection holding the lock).Lock Key Space Management
Advisory locks can be a single bigint or a pair of integers. Using the two-part version, pg_try_advisory_lock(class_id, object_id), is an excellent pattern for namespacing your locks. For example, you could use a class_id to represent the type of resource being locked (e.g., 1 for 'jobs', 2 for 'reports') and the object_id for the specific instance. This virtually eliminates the risk of key collisions between different subsystems in your application.
Performance: Advisory Locks vs. `SELECT ... FOR UPDATE SKIP LOCKED`
It's important to be objective about where this pattern fits. For a high-throughput, homogeneous job queue (many workers, many available jobs), SELECT ... FOR UPDATE SKIP LOCKED is often the more performant choice. It's a single, atomic, index-driven operation that finds and locks available rows directly in the database engine. It is purpose-built for this exact problem.
The advisory lock pattern involves more round trips and application-side logic (fetch candidates, loop, try-lock, double-check). So where does it excel?
SKIP LOCKED places a row lock that is typically tied to a transaction. For a job that runs for minutes or hours, holding a transaction open is highly problematic (it can cause VACUUM issues and bloat). The session-level advisory lock decouples the lock's lifetime from the transaction's lifetime, which is a massive advantage.pg_try_advisory_lock approach is extremely low-contention. Workers that fail to get the lock return immediately without waiting or blocking others.Conclusion: A Pragmatic Tool for Distributed Coordination
PostgreSQL advisory locks are not a silver bullet, but they are a remarkably powerful and pragmatic tool in the senior engineer's arsenal. By leveraging the database you already depend on, you can build surprisingly resilient and robust distributed systems—from singleton task schedulers to complex job queues—without introducing the complexity and operational burden of external coordination services.
The key is to move beyond the basic function call and deeply understand the implementation patterns and trade-offs. By embracing session-level locks for their automatic cleanup properties, implementing the critical double-check pattern to prevent race conditions, and using pg_locks for observability, you can confidently deploy this pattern to solve real-world distributed coordination problems in production.