PostgreSQL LISTEN/NOTIFY for Resilient Real-Time Architectures
Beyond The Basics: LISTEN/NOTIFY in Production
As senior engineers, we often face architectural decisions that balance complexity, performance, and reliability. When event-driven capabilities are required, the default reaction is often to reach for a dedicated message broker like Kafka or RabbitMQ. While powerful, these tools introduce significant operational overhead. For systems where the event source is the primary database, a compelling, often overlooked alternative exists directly within PostgreSQL: LISTEN and NOTIFY.
This is not an introduction. We will not cover the basic syntax. Instead, we'll dissect the advanced patterns required to use this feature in a high-stakes production environment. We'll tackle the hard problems: What happens if a listener service crashes? How do you guarantee an event is processed exactly once? How do you scale past a single listener process? And how do you work around the infamous 8000-byte payload limit?
The core thesis is this: When combined with the transactional outbox pattern and row-level locking, LISTEN/NOTIFY provides a remarkably simple and resilient foundation for real-time features that demand transactional consistency with their source data. We'll explore cache invalidation, background job triggers, and real-time UI updates as our guiding use cases.
The Cornerstone: Transactional Guarantees
The single most compelling feature of pg_notify is its transactional awareness. Unlike a dual-write pattern where you write to your database and then make a separate call to a message broker (a call that could fail, leaving your systems in an inconsistent state), a NOTIFY is bound to the database transaction that issues it. The notification is only delivered to listening clients after the transaction successfully commits.
If the transaction rolls back for any reason—a constraint violation, a serialization failure, or an explicit ROLLBACK—the notification is discarded. It never happened. This atomicity eliminates an entire class of distributed state consistency problems.
Let's model this with a concrete example: a product inventory system where an update must trigger a cache invalidation event.
Schema and Trigger Setup
First, our basic table:
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sku TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
stock_count INT NOT NULL CHECK (stock_count >= 0),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Now, instead of having our application logic call pg_notify, we can encapsulate this behavior in a trigger. This ensures that any update to the table, regardless of the source, will generate the notification, reducing the chance of developer error.
-- A function to be called by the trigger
CREATE OR REPLACE FUNCTION notify_product_update()
RETURNS TRIGGER AS $$
DECLARE
payload JSON;
BEGIN
-- Construct a minimal payload. We'll discuss why it's minimal later.
payload = json_build_object(
'table', TG_TABLE_NAME,
'id', NEW.id,
'sku', NEW.sku,
'timestamp', NOW()
);
-- Notify the 'product_updates' channel.
-- The second argument is the payload.
PERFORM pg_notify('product_updates', payload::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- The trigger itself, fires after an update
CREATE TRIGGER product_update_trigger
AFTER UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION notify_product_update();
Demonstrating Transactional Integrity
Now, let's observe the behavior in a psql session. First, start a listener in one terminal:
-- Terminal 1: The Listener
LISTEN product_updates;
In a second terminal, perform an update within a transaction that we will ultimately roll back.
-- Terminal 2: The Producer
BEGIN;
-- Update a product (assuming one exists with this SKU)
UPDATE products SET stock_count = 99 WHERE sku = 'TSHIRT-RED-L';
-- At this point, the trigger has fired internally, but the notification
-- is queued and has NOT been sent to the listener in Terminal 1.
-- Now, roll back the transaction
ROLLBACK;
Observe Terminal 1. Nothing happens. The notification was discarded along with the transaction. Now, let's try a successful transaction.
-- Terminal 2: The Producer
BEGIN;
UPDATE products SET stock_count = 98 WHERE sku = 'TSHIRT-RED-L';
COMMIT;
-- Immediately upon COMMIT, the notification is sent.
Terminal 1 will now instantly display the notification:
Asynchronous notification "product_updates" with payload "{"table": "products", "id": "...", "sku": "TSHIRT-RED-L", "timestamp": "..."}" received from server process with PID 12345.
This transactional guarantee is the foundation upon which we can build a reliable system. It's something that requires significant engineering effort to replicate with external message brokers.
Architecting a Resilient Listener Service: The Outbox Pattern
While transactional delivery is a huge win, it doesn't solve a critical failure mode: what if the listener service is down? If your cache invalidation service is offline for an hour due to a deployment or crash, all notifications sent during that time are lost forever. LISTEN/NOTIFY is a fire-and-forget mechanism.
To solve this, we introduce the Transactional Outbox pattern. The core idea is to persist the event to a database table within the same transaction as the business logic change. The NOTIFY call then becomes a simple, ephemeral trigger to tell the listener, "Hey, there's something new in the outbox for you to check."
This elegantly decouples the event creation from the event delivery, providing a durable log of events that must be processed.
Evolving the Schema for an Outbox
Let's create an events_outbox table.
CREATE TABLE events_outbox (
id BIGSERIAL PRIMARY KEY,
channel TEXT NOT NULL,
payload JSONB NOT NULL,
processed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Index for efficiently finding unprocessed events
CREATE INDEX idx_unprocessed_events ON events_outbox (id) WHERE (processed_at IS NULL);
Now, we modify our trigger to write to this table instead of directly calling pg_notify. The notification will be triggered by an INSERT on the outbox table itself.
-- 1. Modify the product update function to write to the outbox
CREATE OR REPLACE FUNCTION create_product_update_event()
RETURNS TRIGGER AS $$
DECLARE
event_payload JSONB;
BEGIN
event_payload = jsonb_build_object(
'source_table', TG_TABLE_NAME,
'source_id', NEW.id,
'new_stock_count', NEW.stock_count
-- Include any other relevant data from the NEW row
);
INSERT INTO events_outbox (channel, payload)
VALUES ('product_updates', event_payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Replace the old trigger
DROP TRIGGER IF EXISTS product_update_trigger ON products;
CREATE TRIGGER product_update_outbox_trigger
AFTER UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION create_product_update_event();
-- 2. Create a new, generic trigger on the outbox table to send the NOTIFY
CREATE OR REPLACE FUNCTION notify_outbox_insert()
RETURNS TRIGGER AS $$
BEGIN
-- We notify on the channel specified in the row itself.
-- The payload is just the ID of the outbox event, keeping it small.
PERFORM pg_notify(NEW.channel, NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_insert_trigger
AFTER INSERT ON events_outbox
FOR EACH ROW
EXECUTE FUNCTION notify_outbox_insert();
With this setup:
UPDATE on products occurs within a transaction.product_update_outbox_trigger fires, inserting a record into events_outbox within the same transaction.outbox_insert_trigger fires, queueing a pg_notify call, also within the same transaction.products change and the events_outbox record are atomically saved, and only then is the notification sent.Node.js/TypeScript Listener Implementation
Now, let's build a listener service that consumes from this outbox. This service needs to be idempotent and capable of catching up on missed events.
We'll use the pg library in Node.js.
// listener.ts
import { Client } from 'pg';
const DB_CONFIG = {
user: 'your_user',
host: 'localhost',
database: 'your_db',
password: 'your_password',
port: 5432,
};
// A dedicated client for listening. This connection must not be used for
// other queries, especially not inside transactions.
const listenerClient = new Client(DB_CONFIG);
// A separate client for processing events.
const processorClient = new Client(DB_CONFIG);
async function processEvent(eventId: string): Promise<void> {
console.log(`[Processor] Received event ID: ${eventId}. Processing...`);
const client = processorClient;
try {
// Begin a transaction for processing
await client.query('BEGIN');
// Fetch the event payload and lock the row to prevent concurrent processing
const res = await client.query(
'SELECT * FROM events_outbox WHERE id = $1 AND processed_at IS NULL FOR UPDATE',
[eventId]
);
if (res.rows.length === 0) {
// This can happen if another processor grabbed it first, or it's already processed.
console.log(`[Processor] Event ${eventId} already processed or does not exist. Skipping.`);
await client.query('ROLLBACK');
return;
}
const event = res.rows[0];
console.log(`[Processor] Processing payload:`, event.payload);
// --- YOUR BUSINESS LOGIC HERE ---
// Example: Invalidate a Redis cache key
// await redis.del(`product:${event.payload.source_id}`);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 100));
// --------------------------------
// Mark the event as processed
await client.query(
'UPDATE events_outbox SET processed_at = NOW() WHERE id = $1',
[eventId]
);
// Commit the transaction
await client.query('COMMIT');
console.log(`[Processor] Successfully processed event ID: ${eventId}.`);
} catch (error) {
console.error(`[Processor] Error processing event ${eventId}:`, error);
await client.query('ROLLBACK');
// Implement retry logic or move to a dead-letter queue here
}
}
// Function to process any missed events on startup
async function processMissedEvents() {
console.log('[Catchup] Checking for missed events on startup...');
const res = await processorClient.query(
'SELECT id FROM events_outbox WHERE processed_at IS NULL ORDER BY id ASC'
);
if (res.rows.length > 0) {
console.log(`[Catchup] Found ${res.rows.length} missed events. Processing...`);
for (const row of res.rows) {
await processEvent(row.id);
}
} else {
console.log('[Catchup] No missed events found.');
}
}
async function main() {
await listenerClient.connect();
await processorClient.connect();
// Process any events that were missed while the service was down
await processMissedEvents();
listenerClient.on('notification', (msg) => {
console.log(`[Listener] Received notification on channel '${msg.channel}'`);
if (msg.payload) {
// Don't await this; let it run in the background
// to keep the listener responsive.
processEvent(msg.payload);
}
});
await listenerClient.query('LISTEN product_updates');
console.log('Listener service started. Waiting for notifications...');
}
main().catch(console.error);
This architecture is now resilient. If the listener service is down for an hour, the events_outbox table accumulates unprocessed events. When the service restarts, the processMissedEvents function immediately queries the table and processes everything it missed, ensuring no data is lost.
Payload Management: The 8000-Byte Constraint
A hard limit of pg_notify is that the payload cannot exceed 8000 bytes. Attempting to send a larger payload will result in an error. This is a common pitfall for developers who try to serialize large JSON objects directly into the notification.
The outbox pattern we've just implemented elegantly solves this. Notice in our notify_outbox_insert trigger, the payload is minimal:
PERFORM pg_notify(NEW.channel, NEW.id::text);
We are only sending the primary key of the events_outbox record. This is a tiny string, guaranteed to be well under the 8KB limit. The listener receives this ID and then uses its separate processing connection to fetch the full, rich payload JSONB object from the table. This pattern is not only safer but also more flexible, as you can store multi-megabyte JSONB objects in your outbox table without issue.
Scaling Listeners: The Competing Consumer Pattern
Our current listener service is resilient but not scalable. If we run two instances of listener.ts, both will connect, both will LISTEN product_updates, and both will receive every single notification. They will then race to process the same event, leading to duplicated work and potential database contention.
We need a way for multiple listeners to cooperate, ensuring each event from the outbox is processed by exactly one worker. This is the "competing consumer" pattern, and we can implement it with a powerful, often underutilized feature of PostgreSQL: FOR UPDATE SKIP LOCKED.
SELECT ... FOR UPDATE places a lock on the rows it selects, preventing other transactions from modifying or locking them until the current transaction completes. The SKIP LOCKED clause is the magic ingredient: if a row is already locked by another transaction, this query will simply ignore it and move on to the next available unlocked row.
Implementing Scalable Event Processing
We can modify our processEvent logic to use a batch-processing approach with SKIP LOCKED. Instead of processing one event per notification, a notification will signal to all listeners that there's work to be done, and they will all attempt to claim a batch of available work from the outbox table.
Here is the refined, scalable processor logic:
// In listener.ts
// ... (imports and config are the same)
const BATCH_SIZE = 10;
async function processEventBatch(): Promise<number> {
const client = processorClient;
let processedCount = 0;
await client.query('BEGIN');
try {
// The core of the competing consumer pattern
const res = await client.query(
`SELECT id, payload FROM events_outbox
WHERE processed_at IS NULL
ORDER BY id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED`,
[BATCH_SIZE]
);
if (res.rows.length === 0) {
await client.query('COMMIT'); // Nothing to do, just commit empty tx
return 0;
}
const eventIdsToProcess = res.rows.map(r => r.id);
console.log(`[Processor] Claimed batch of ${eventIdsToProcess.length} events:`, eventIdsToProcess);
for (const event of res.rows) {
console.log(`[Processor] Processing payload:`, event.payload);
// --- YOUR BUSINESS LOGIC HERE ---
// await redis.del(`product:${event.payload.source_id}`);
await new Promise(resolve => setTimeout(resolve, 50)); // Simulate work
// --------------------------------
}
// Mark the entire batch as processed
await client.query(
'UPDATE events_outbox SET processed_at = NOW() WHERE id = ANY($1::bigint[])',
[eventIdsToProcess]
);
await client.query('COMMIT');
processedCount = eventIdsToProcess.length;
console.log(`[Processor] Successfully processed batch of ${processedCount} events.`);
} catch (error) {
console.error(`[Processor] Error processing batch:`, error);
await client.query('ROLLBACK');
// Consider more granular error handling for individual events in a batch
}
return processedCount;
}
// A simple semaphore to prevent concurrent processing runs in the same process
let isProcessing = false;
async function handleNotification() {
if (isProcessing) {
console.log('[Handler] Already processing, skipping new notification trigger.');
return;
}
isProcessing = true;
try {
// Keep processing batches until no work is left
let processedInLastBatch;
do {
processedInLastBatch = await processEventBatch();
} while (processedInLastBatch > 0);
} finally {
isProcessing = false;
}
}
async function main() {
await listenerClient.connect();
await processorClient.connect();
// Initial catch-up on startup
await handleNotification();
listenerClient.on('notification', (msg) => {
console.log(`[Listener] Received notification on channel '${msg.channel}'. Triggering processing.`);
// The payload itself is less important now; it's just a signal.
handleNotification();
});
await listenerClient.query('LISTEN product_updates');
console.log('Scalable listener service started. Waiting for notifications...');
}
main().catch(console.error);
Now you can run this listener.ts script on multiple servers or in multiple containers. When a notification arrives, all instances will wake up and try to run processEventBatch. Thanks to SKIP LOCKED, they will each grab a different set of up to 10 events from the outbox. They will work in parallel, efficiently and safely draining the queue without stepping on each other's toes. This provides horizontal scalability for your event processors.
When NOT to Use LISTEN/NOTIFY
This architecture is powerful, but it's not a silver bullet. A senior engineer knows the boundaries of a pattern. Do not use LISTEN/NOTIFY when:
LISTEN/NOTIFY is fast, but it's not designed for tens or hundreds of thousands of messages per second. The notification queue is in memory, and very high churn can cause bloat or missed notifications (which the outbox mitigates, but the performance ceiling is real). If you're building a firehose, use a tool built for it.LISTEN/NOTIFY is a simple pub/sub mechanism. You can use multiple channels, but any complex logic must be built into your consumers.Conclusion
PostgreSQL's LISTEN/NOTIFY is far more than a toy for simple demos. When architected correctly with the transactional outbox pattern, it provides a robust, reliable, and transactionally consistent mechanism for eventing. By leveraging FOR UPDATE SKIP LOCKED, you can build horizontally scalable, competing consumers that process a shared work queue with no risk of data loss or duplication.
This approach dramatically reduces operational complexity compared to deploying and maintaining a separate message broker cluster. For a vast number of common use cases—cache invalidation, real-time notifications, search index updates, and background job triggering—it hits a sweet spot of simplicity, performance, and transactional safety that is difficult to achieve otherwise. It's a powerful tool that every senior engineer working with PostgreSQL should have in their arsenal.