Tuning PostgreSQL Logical Replication for High-Throughput CDC
The High-Stakes Game of High-Throughput Logical Replication
PostgreSQL's logical replication is a cornerstone of modern data architecture, enabling Change Data Capture (CDC) for microservices, data warehousing, and real-time analytics. While setting up a basic publication and subscription is straightforward, scaling it to handle tens of thousands of writes per second without compromising database stability is a different engineering challenge entirely. When misconfigured, a CDC pipeline can become the single greatest threat to your production database, leading to unbounded disk usage, cascading failures, and catastrophic data loss.
This is not a guide for beginners. We assume you understand what logical replication is, why you need it, and how to create a publication and a replication slot. Instead, we will dissect the internal mechanics and expose the tuning levers that separate a fragile implementation from a resilient, high-performance production system. We'll cover the publisher, the subscriber, and the treacherous ground in between, focusing on the patterns required to operate at scale.
Section 1: Publisher-Side Tuning: Stemming the WAL Flood at the Source
The entire logical replication process begins with the Write-Ahead Log (WAL). Every change decoded and sent to a subscriber originates here. Therefore, the first and most critical area for optimization is controlling the generation, retention, and processing of WAL on the primary database.
1.1. WAL Configuration: Your First Line of Defense
Your postgresql.conf settings are the foundation. Getting them wrong means any other optimization is merely a band-aid.
* wal_level = logical: This is the non-negotiable prerequisite. However, be aware of its performance impact. It adds more information to the WAL records, slightly increasing I/O overhead on every write. For a high-throughput system, this is a cost you must budget for in your performance planning.
* max_wal_senders: This parameter controls the maximum number of concurrent walsender processes, which stream WAL data to subscribers. It's not just for physical replicas; each logical replication subscription requires a walsender. A common mistake is setting this too low. A good starting point is number_of_physical_replicas + number_of_logical_subscriptions + buffer. For example, for a primary with 2 physical replicas and 3 logical consumers (e.g., Debezium, a custom analytics pipeline, an audit service), a value of 10 provides a safe margin.
* max_replication_slots: Each logical subscription needs a replication slot to track its progress. If a consumer disconnects, the slot ensures that the primary retains all the WAL files the consumer hasn't yet acknowledged. This is both a feature and a danger. An abandoned or lagging consumer with an active slot will prevent WAL cleanup, eventually filling your data disk and crashing the entire database. Size this carefully and monitor it relentlessly. max_replication_slots should be at least equal to max_wal_senders.
* wal_keep_size (Postgres 13+) / wal_keep_segments (older versions): This setting specifies a minimum amount of WAL to retain in the pg_wal directory, independent of replication slots. It's a safety net for physical replicas that fall behind. Do not rely on this for logical replication. A replication slot is the correct mechanism. Setting wal_keep_size too high wastes disk space. A modest value (e.g., 1-4 GB) is reasonable, but your primary WAL retention strategy for logical consumers must be the replication slots themselves.
1.2. The Art of the Publication: Minimizing the Replication Stream
Sending every change from every table is inefficient and risky. The more data you push through the replication stream, the higher the network I/O, CPU usage for decoding, and potential for lag. Tailor your publications with surgical precision.
Bad Practice: The Firehose
CREATE PUBLICATION all_changes_pub FOR ALL TABLES;
This is simple but dangerous for a large database. You'll be replicating changes to session tables, temporary data, and internal application tables that downstream consumers don't need, creating unnecessary load.
Good Practice: Selective Publication
CREATE PUBLICATION orders_pub FOR TABLE orders, order_items;
This is far better. You explicitly define the data contract for your consumers. But we can go deeper.
Advanced Pattern: Row-Level Filtering (Postgres 15+)
PostgreSQL 15 introduced a game-changing feature: server-side row filtering for publications. This allows you to push filtering logic to the database, preventing unwanted rows from ever entering the replication stream.
Consider a scenario where you only want to stream orders that have been finalized (status = 'completed' or status = 'shipped').
-- On a high-volume 'orders' table, this can reduce replication traffic by over 80%
CREATE PUBLICATION finalized_orders_pub FOR TABLE orders
WHERE (status = 'completed' OR status = 'shipped');
Performance Impact Analysis:
Let's model this. Assume an orders table with 100 million rows and a workload of 1,000 TPS. 80% of transactions are updates to orders in a pending or processing state, while only 20% are transitions to a final state.
* Without WHERE clause: 1,000 transactions/sec are written to WAL and sent to the walsender for decoding and streaming.
* With WHERE clause: 1,000 transactions/sec are still written to WAL (this is unavoidable). However, during logical decoding, the walsender process evaluates the WHERE clause. Only the 200 transactions/sec that match the criteria are actually sent to the subscriber. This results in:
* 80% reduction in network traffic between the primary and the consumer.
* 80% reduction in the data that the consumer has to process, deserialize, and acknowledge.
* Slightly higher CPU on the primary's walsender process to evaluate the filter, but this is almost always a worthwhile trade-off.
1.3. REPLICA IDENTITY: The Hidden Cost of Updates and Deletes
For UPDATE and DELETE operations, PostgreSQL needs a way to uniquely identify the old row on the subscriber side. This is controlled by the REPLICA IDENTITY table property.
* DEFAULT: Uses the primary key. This is usually the best option.
* USING INDEX : Uses a specific unique, non-partial, non-deferrable index.
FULL: Logs the previous values of all* columns in the row. This is a performance killer for tables with many columns or large text/jsonb fields. It dramatically bloats the WAL, increasing I/O and network traffic. Avoid REPLICA IDENTITY FULL unless absolutely necessary.
* NOTHING: Does not log any old-row information. UPDATEs and DELETEs will fail for this table in the publication.
Production Scenario: An engineer adds a table to a publication but forgets to set a primary key. The system defaults to REPLICA IDENTITY NOTHING. Months later, a feature is added that deletes rows from this table. The application sees successful DELETEs, but the replication stream starts throwing errors, and the consumer halts, causing replication lag to build up until the database crashes. Always explicitly set a PRIMARY KEY and use REPLICA IDENTITY DEFAULT on any table you intend to replicate.
Section 2: The Consumer: Building for Resilience and Backpressure
A poorly written consumer is just as dangerous as a misconfigured publisher. Its primary responsibilities are to process messages efficiently and, most importantly, to gracefully handle situations where it cannot keep up with the publisher's firehose of data (a phenomenon known as backpressure).
We'll demonstrate a robust consumer pattern in Go using the excellent pgx library.
2.1. Core Consumer Logic: Batching and Acknowledging
The fundamental loop of a consumer involves receiving a message, processing it, and then acknowledging the Log Sequence Number (LSN) to the primary. Acknowledging the LSN informs the primary that it can now purge the corresponding WAL files.
Here is a simplified, naive consumer. Do not use this in production.
// WARNING: Naive implementation for demonstration only.
func NaiveConsumer(conn *pgx.Conn, publicationName string) {
standbyMessageTimeout := 10 * time.Second
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
for {
if time.Now().After(nextStandbyMessageDeadline) {
// Must periodically send status to server
conn.SendStandbyStatus(context.Background(), pgx.StandbyStatus{WALWritePosition: lastLSN})
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
ctx, cancel := context.WithDeadline(context.Background(), nextStandbyMessageDeadline)
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if pgconn.Timeout(err) {
continue
}
if err != nil {
log.Fatalf("ReceiveMessage failed: %v", err)
}
if errMsg, ok := rawMsg.(*pgconn.ErrorResponse); ok {
log.Fatalf("Received Postgres WAL error: %+v", errMsg)
}
msg, ok := rawMsg.(*pgconn.ReplicationMessage)
if !ok {
continue
}
if msg.WALMessage != nil {
// Process the message here...
// ... this is the slow part
// Acknowledge immediately
lastLSN = msg.WALMessage.WALStart + pglogrepl.LSN(msg.WALMessage.WALDataLen)
}
}
}
The critical flaw here is that processing and acknowledging are synchronous. If the downstream system (e.g., writing to Kafka, updating a search index) is slow, the consumer will stop reading from the replication stream. The primary's walsender will block, replication lag will grow, and the pg_wal directory will fill up.
2.2. Production Pattern: Decoupled Processing with Bounded Channels for Backpressure
A robust consumer must decouple receiving/acknowledging from processing. We can achieve this using a bounded channel. This pattern allows us to absorb bursts of writes while applying backpressure gracefully if the downstream sink is chronically slow.
Architecture:
Here's a more complete, production-ready implementation:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pglogrepl"
"github.com/jackc/pgx/v5/pgproto3"
)
const (
outputPlugin = "pgoutput"
standbyTimeout = 10 * time.Second
ackQueueCapacity = 1024 // Bounded channel size
)
// Represents a parsed WAL message to be processed
type WALEvent struct {
LSN pglogrepl.LSN
Data []byte // Or a more structured type after parsing
}
func main() {
// Database connection string for replication
connString := "postgres://user:password@host:5432/dbname?replication=database"
conn, err := pgconn.Connect(context.Background(), connString)
if err != nil {
log.Fatalf("failed to connect: %v", err)
}
defer conn.Close(context.Background())
slotName := "my_cdc_slot"
publicationName := "my_publication"
// 1. Identify the system and get the current LSN
ident, err := pglogrepl.IdentifySystem(context.Background(), conn)
if err != nil {
log.Fatalf("IdentifySystem failed: %v", err)
}
log.Println("SystemID:", ident.SystemID, "Timeline:", ident.Timeline, "XLogPos:", ident.XLogPos, "DBName:", ident.DBName)
// 2. Create replication slot if it doesn't exist
_, err = pglogrepl.CreateReplicationSlot(context.Background(), conn, slotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{Temporary: false})
if err != nil {
pgErr, ok := err.(*pgconn.PgError)
if ok && pgErr.Code == "42710" { // 42710 is duplicate_object
log.Println("Replication slot already exists, continuing")
} else {
log.Fatalf("CreateReplicationSlot failed: %v", err)
}
}
// 3. Start replication
err = pglogrepl.StartReplication(context.Background(), conn, slotName, ident.XLogPos, pglogrepl.StartReplicationOptions{PluginArgs: []string{"proto_version '1'", fmt.Sprintf("publication_names '%s'", publicationName)}})
if err != nil {
log.Fatalf("StartReplication failed: %v", err)
}
log.Println("Logical replication started on slot", slotName)
var lastFlushedLSN, lastProcessedLSN atomic.Value
lastFlushedLSN.Store(pglogrepl.LSN(0))
lastProcessedLSN.Store(pglogrepl.LSN(0))
eventQueue := make(chan *WALEvent, ackQueueCapacity)
ctx, cancel := context.WithCancel(context.Background())
// Graceful shutdown handling
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-c
log.Println("Shutdown signal received, cleaning up...")
cancel()
}()
// Goroutine: Receiver (reads from Postgres, pushes to channel)
go func() {
defer close(eventQueue)
for ctx.Err() == nil {
rawMsg, err := conn.ReceiveMessage(ctx)
if err != nil {
if pgconn.Timeout(err) {
continue
}
if ctx.Err() != nil { // Expected error on shutdown
return
}
log.Printf("ReceiveMessage failed: %v, exiting receiver", err)
cancel() // Trigger shutdown on unexpected error
return
}
if errMsg, ok := rawMsg.(*pgconn.ErrorResponse); ok {
log.Printf("Received Postgres WAL error: %+v", errMsg)
continue
}
msg, ok := rawMsg.(*pgconn.XLogData)
if !ok {
continue
}
// Put event on channel. This will block if the channel is full,
// applying backpressure to the TCP connection itself.
eventQueue <- &WALEvent{LSN: msg.WALStart + pglogrepl.LSN(len(msg.WALData)), Data: msg.WALData}
}
}()
// Goroutine: Processor (reads from channel, processes data)
go func() {
for event := range eventQueue {
// SIMULATE WORK: In a real system, this would be writing to Kafka,
// updating ElasticSearch, calling an API, etc.
// This is the part that can be slow.
log.Printf("Processing event at LSN %s", event.LSN.String())
// Simulate processing time that varies
time.Sleep(time.Millisecond * time.Duration(10+((int(event.LSN)%10)-5)))
// Update the latest LSN we have successfully processed
lastProcessedLSN.Store(event.LSN)
}
}()
// Goroutine: Acknowledger (periodically sends standby status)
go func() {
ticker := time.NewTicker(standbyTimeout / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
processedLSN := lastProcessedLSN.Load().(pglogrepl.LSN)
flushedLSN := lastFlushedLSN.Load().(pglogrepl.LSN)
if processedLSN > flushedLSN {
err := pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWrite: processedLSN})
if err != nil {
log.Printf("SendStandbyStatusUpdate failed: %v", err)
} else {
lastFlushedLSN.Store(processedLSN)
log.Printf("Acknowledged LSN %s", processedLSN.String())
}
}
case <-ctx.Done():
return
}
}
}()
<-ctx.Done() // Wait for shutdown signal
// Final flush on shutdown
processedLSN := lastProcessedLSN.Load().(pglogrepl.LSN)
log.Printf("Performing final LSN flush for %s", processedLSN.String())
pglogrepl.SendStandbyStatusUpdate(context.Background(), conn, pglogrepl.StandbyStatusUpdate{WALWrite: processedLSN})
conn.Close(context.Background())
log.Println("Consumer shut down gracefully.")
}
How this design handles backpressure:
* The eventQueue channel has a fixed capacity (ackQueueCapacity).
* The receiver goroutine reads from the TCP socket and tries to push to eventQueue.
* If the processor is slow, the eventQueue will fill up.
* Once full, the line eventQueue <- &WALEvent{...} in the receiver will block.
* This stops the receiver from reading from the TCP socket.
* The OS's TCP receive buffer will then fill up.
* TCP flow control kicks in, signaling the sender (the PostgreSQL walsender) to stop sending data.
* The walsender process on the primary now waits, and replication lag begins to accumulate in a controlled manner, visible via monitoring. The database itself is not harmed, as the slot is still active. This is the correct way to handle backpressure.
Section 3: Monitoring: Your Eyes on the Pipeline
You cannot effectively manage a high-throughput replication system without robust, real-time monitoring. Flying blind is a recipe for disaster.
3.1. The Single Most Important Metric: Replication Lag
Replication lag tells you how far behind a consumer is from the primary's current state. There are several ways to measure it, but the most accurate is in bytes of WAL.
This query is your lifeline. Run it frequently and alert on it.
SELECT
slot_name,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS replication_lag_bytes
FROM pg_replication_slots
WHERE slot_type = 'logical';
* pg_current_wal_lsn(): The current LSN on the primary.
* confirmed_flush_lsn: The last LSN that the consumer has acknowledged.
* pg_wal_lsn_diff(): The difference in bytes.
Alerting Strategy:
* Warning Threshold (e.g., > 1 GB): This indicates a consumer is slowing down. Time to investigate. Is there a slow downstream dependency? Is the consumer CPU-bound?
* Critical Threshold (e.g., > 10 GB): This is a serious problem. The consumer may be stuck or crashed. This requires immediate intervention before you risk filling the pg_wal disk.
* Alert on active = false for more than a few minutes: An inactive slot for a provisioned consumer means it has disconnected and is not consuming WAL. This is a critical failure state. The slot will hold back WAL indefinitely.
3.2. Exposing Metrics to Prometheus
For proper observability, you need to export these metrics to a monitoring system like Prometheus. Using an exporter like pg_exporter, you can define custom queries.
custom_queries.yaml for pg_exporter:
pg_replication_slots:
query: |
SELECT
slot_name,
slot_type,
active::int,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) as lag_bytes
FROM pg_replication_slots
metrics:
- slot_name:
usage: "LABEL"
description: "Name of the replication slot"
- slot_type:
usage: "LABEL"
description: "Type of the replication slot (physical or logical)"
- active:
usage: "GAUGE"
description: "1 if the slot is active, 0 otherwise"
- lag_bytes:
usage: "GAUGE"
description: "Replication lag in bytes"
With this, you can write powerful PromQL alert rules:
# Alert if a logical slot has been inactive for 10 minutes
alert: PostgresLogicalSlotInactive
expr: pg_replication_slots_active{slot_type="logical"} == 0
for: 10m
labels:
severity: critical
annotations:
summary: "PostgreSQL logical slot {{ $labels.slot_name }} is inactive"
description: "The consumer for slot {{ $labels.slot_name }} has been disconnected for over 10 minutes. This will prevent WAL cleanup and may lead to disk exhaustion."
# Alert if replication lag exceeds 10 GiB
alert: PostgresHighReplicationLag
expr: pg_replication_slots_lag_bytes{slot_type="logical"} > 10 * 1024^3
for: 5m
labels:
severity: critical
annotations:
summary: "High PostgreSQL replication lag on slot {{ $labels.slot_name }}"
description: "Replication lag for slot {{ $labels.slot_name }} is over 10 GiB. The consumer cannot keep up with the write load."
Section 4: Advanced Scenarios and Edge Cases
High-throughput systems inevitably encounter complex operational challenges.
4.1. The Terror of Schema Migrations
This is the most common cause of catastrophic logical replication failure. A seemingly innocent ALTER TABLE can break your entire pipeline.
The Problem: The pgoutput plugin sends schema information along with data. If you run an ALTER TABLE ... ADD COLUMN ... on the primary, the replication stream will start including the new column. If your consumer code hasn't been updated to handle this new field, it will likely crash on deserialization.
The Coordinated Migration Pattern:
ALTER TABLE DDL on the primary database.This requires tight coordination between the DBA/platform team and the application development team. Automating this in a CI/CD pipeline is the gold standard for mature systems.
4.2. Bootstrapping a New Consumer: The Snapshot Problem
When a new logical replication slot is created, it can optionally perform an initial snapshot of the tables in the publication. This process requires a SHARE lock on each table, which blocks all other DML except SELECT. On a high-throughput table, this lock can cause a massive pile-up of write transactions, leading to application timeouts and failures.
The Low-Impact Bootstrap Strategy:
SELECT pg_create_logical_replication_slot('new_consumer_slot', 'pgoutput', false);
SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'new_consumer_slot';
-- e.g., returns '0/1A2B3C4D'
pg_dump to perform a data export that is consistent with that LSN. This uses a REPEATABLE READ transaction and does not take long-lived locks. pg_dump --host ... --dbname ... --table=orders --snapshot-name=<snapshot_id_from_repeatable_read_transaction>
A more robust way is to open a REPEATABLE READ transaction, run SELECT pg_export_snapshot(), and then use that snapshot ID with pg_dump --snapshot=. The LSN from when the transaction started is your point of consistency.
This multi-step process is far more complex than a simple EXPORT_SNAPSHOT but is the only viable method for bootstrapping consumers against databases that cannot tolerate write locks.
4.3. High Availability and Slot Failover
In a failover scenario (e.g., using Patroni), logical replication slots are not automatically failed over to the new primary. The slot exists only on the old primary's disk. When you promote a replica, the slot is gone.
This means that after a failover, your consumer will be unable to connect. If you create a new slot with the same name on the new primary, it will start from the current LSN, causing you to lose all data generated between the last acknowledged LSN on the old primary and the time the new slot was created.
Solution: You must use tooling that manages slot failover. For example, Patroni has some support for synchronizing slots to replicas, but it can be tricky. A common custom solution involves:
* The consumer periodically storing its last acknowledged LSN in a highly available store (like etcd or a separate database).
* After a failover, a script or operator checks if the slot exists on the new primary.
* If not, it reads the last known LSN from the HA store and creates a new slot starting from that position using pg_replication_slot_advance() or by specifying the LSN at the start of replication.
This is a complex recovery process that must be automated and tested rigorously.
Conclusion: Mastery Requires Vigilance
Scaling PostgreSQL's logical replication is a testament to the principle that powerful tools require deep understanding. The default settings are designed for safety, not for high-throughput performance. By moving the tuning focus from the consumer back to the publisher, filtering aggressively, building resilient backpressure-aware consumers, and implementing comprehensive, actionable monitoring, you can transform logical replication from a potential liability into a robust and scalable foundation for your data architecture. The patterns discussed here are not one-time fixes; they represent an operational discipline required to run mission-critical CDC pipelines in production.