PostgreSQL CDC for Real-Time CQRS Read Model Projections

17 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Fragility of Traditional CQRS Projections

In mature Command Query Responsibility Segregation (CQRS) architectures, the synchronization of state between the write model (the source of truth) and various read models (optimized for queries) is the system's most critical and failure-prone seam. Senior engineers have long grappled with the inherent trade-offs of common projection strategies.

The Dual-Write Anti-Pattern

The most straightforward approach—and the most flawed—is the dual-write. Within a single application service transaction, you write to the primary database and then, in the same block, write to the read model (e.g., Elasticsearch, a denormalized cache).

go
func (s *OrderService) CreateOrder(ctx context.Context, order *Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback() // Rollback on error

    // 1. Write to primary DB (Write Model)
    if err := s.orderRepo.Save(tx, order); err != nil {
        return err
    }

    // 2. Write to read model (e.g., Elasticsearch)
    if err := s.searchClient.IndexOrder(ctx, order); err != nil {
        // CRITICAL FLAW: What if this fails but the DB commit succeeds?
        // The primary DB might be committed, but the read model is not updated.
        return err
    }

    return tx.Commit()
}

The fundamental problem is the lack of a distributed transaction coordinator that spans your relational database and your NoSQL read store. If the database commit succeeds but the Elasticsearch call fails (due to network issues, schema validation errors, or temporary unavailability), your systems are now in an inconsistent state. This pattern is brittle and unacceptable for systems requiring high data integrity.

Application-Level Events and the Outbox Pattern

A more robust solution is to emit events from the application layer, often using the Transactional Outbox pattern to guarantee at-least-once delivery. An outbox table within the primary database stores events to be published. The event is inserted into the outbox table within the same local transaction as the business data change.

  • BEGIN TRANSACTION
  • INSERT into orders table.
  • INSERT into outbox table (with event payload).
  • COMMIT TRANSACTION
  • A separate poller or transaction log tailing process then reads from the outbox table and publishes the events to a message broker like Kafka or RabbitMQ. While this solves the atomicity issue of dual-writes, it introduces its own set of production complexities:

    * Increased Application Complexity: The application is now responsible for event serialization, outbox table management, and ensuring the poller is robust and performant.

    * Potential for Latency: The polling mechanism introduces a delay between the database commit and the event publication.

    * Tight Coupling to Data Schema: The event generation logic is embedded within the application code. Any change to the domain model requires a corresponding change in the event publishing logic.

    * Operational Overhead: You now have a message broker and a polling service to maintain, monitor, and scale.

    This is a valid pattern, but what if we could achieve the same reliability with less application-level code and lower latency by tapping directly into the database's own change mechanism? This is where Change Data Capture (CDC) becomes a game-changer.

    The CDC Advantage: PostgreSQL Logical Replication Deep Dive

    Change Data Capture is a design pattern for observing and capturing data changes in a database. Instead of relying on the application to tell us what happened, we let the database—the ultimate source of truth—tell us. In PostgreSQL, this is powerfully implemented through Logical Replication.

    Unlike physical replication, which deals with exact block addresses and byte-by-byte replication for creating hot standbys, logical replication decodes the Write-Ahead Log (WAL) records into a logical, understandable format. It describes changes as INSERT, UPDATE, and DELETE statements on specific rows, which is exactly what we need to build read models.

    Configuring PostgreSQL for Logical Replication

    To enable this, your PostgreSQL instance (version 10+) requires specific configuration. This is a non-negotiable first step.

    In your postgresql.conf file, ensure the following parameters are set:

    ini
    # postgresql.conf
    
    # REQUIRED: Set to 'logical' to enable decoding of WAL for logical replication.
    wal_level = logical
    
    # Number of background workers for logical replication. At least 1 per subscription.
    # A safe starting point is to match your number of Debezium connectors.
    max_replication_slots = 10
    
    # Maximum number of concurrent connections from replication clients.
    max_wal_senders = 10

    A server restart is required after changing wal_level.

    Next, we need to create a publication. A publication is a group of tables whose changes you want to stream. This gives you fine-grained control over what data is exposed via CDC.

    sql
    -- Connect to your database as a superuser or a user with replication permissions.
    
    -- Create a publication for the 'orders' and 'order_items' tables.
    -- This will capture all INSERTs, UPDATEs, and DELETEs for these tables.
    CREATE PUBLICATION cqrs_publication FOR TABLE orders, order_items;
    
    -- To publish changes for ALL tables in the schema (use with caution in production):
    -- CREATE PUBLICATION all_tables_publication FOR ALL TABLES;

    The Critical Role of the Replication Slot

    A client (like Debezium) connects to PostgreSQL through a replication slot. This slot is a crucial component that ensures no WAL data is discarded until the connected client has confirmed receipt. It acts as a durable cursor on the WAL stream.

    This is a double-edged sword:

    * Pro: It guarantees that even if your CDC consumer is down for hours or days, you won't lose any changes. When it reconnects, it will resume streaming from where it left off.

    * Con (CRITICAL OPERATIONAL RISK): If a consumer connects, creates a slot, and then is decommissioned or fails permanently without the slot being dropped, PostgreSQL will retain all WAL files since the slot was last active. This will eventually fill the server's disk, causing a database outage. Monitoring active, inactive, and lagging replication slots is a day-1 operational requirement for any production CDC setup.

    You can inspect replication slots with this query:

    sql
    SELECT
        slot_name,
        plugin,
        slot_type,
        database,
        active,
        restart_lsn,
        pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS replication_lag_bytes
    FROM pg_replication_slots;

    Set up monitoring to alert if active is false for an extended period or if replication_lag_bytes grows uncontrollably.

    Production Implementation with Debezium and Kafka Connect

    We'll use the Debezium connector for PostgreSQL, a battle-tested open-source project that runs on the Kafka Connect framework. This provides a scalable, fault-tolerant platform for running CDC connectors.

    System Architecture

    Our target architecture looks like this:

    PostgreSQL (Write Model) -> Debezium Connector (in Kafka Connect) -> Apache Kafka -> Go Projector Service -> Elasticsearch (Read Model)

    Setting Up the Stack with Docker Compose

    This docker-compose.yml provides a complete, runnable local environment. It includes PostgreSQL, Kafka, Zookeeper, and Kafka Connect with the Debezium connector pre-installed.

    yaml
    version: '3.8'
    
    services:
      postgres:
        image: debezium/postgres:14
        container_name: cqrs_postgres
        ports:
          - "5432:5432"
        environment:
          - POSTGRES_USER=user
          - POSTGRES_PASSWORD=password
          - POSTGRES_DB=cqrs_db
        volumes:
          - pg_data:/var/lib/postgresql/data
    
      zookeeper:
        image: confluentinc/cp-zookeeper:7.3.0
        container_name: zookeeper
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      kafka:
        image: confluentinc/cp-kafka:7.3.0
        container_name: kafka
        ports:
          - "9092:9092"
        depends_on:
          - zookeeper
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    
      connect:
        image: debezium/connect:2.1
        container_name: kafka_connect
        ports:
          - "8083:8083"
        depends_on:
          - kafka
          - postgres
        environment:
          BOOTSTRAP_SERVERS: 'kafka:29092'
          GROUP_ID: 1
          CONFIG_STORAGE_TOPIC: my_connect_configs
          OFFSET_STORAGE_TOPIC: my_connect_offsets
          STATUS_STORAGE_TOPIC: my_connect_statuses
    
    volumes:
      pg_data:

    Run docker-compose up -d to start the environment. After it's running, create the necessary tables and the publication in the PostgreSQL database.

    Configuring the Debezium Connector

    Once Kafka Connect is running, you can register the PostgreSQL connector by POSTing a JSON configuration to its API (http://localhost:8083/connectors).

    Here is a production-ready configuration with detailed explanations:

    json
    {
      "name": "orders-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "cqrs_postgres",
        "database.port": "5432",
        "database.user": "user",
        "database.password": "password",
        "database.dbname": "cqrs_db",
        "database.server.name": "production_db_server",
        "plugin.name": "pgoutput",
        "publication.name": "cqrs_publication",
        "slot.name": "debezium_cqrs_slot",
        "snapshot.mode": "initial",
        "tombstones.on.delete": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
      }
    }

    Key Configuration Parameters Explained:

    * database.server.name: A logical name for the database server. This becomes the prefix for all Kafka topics created by this connector (e.g., production_db_server.public.orders).

    * plugin.name: pgoutput is PostgreSQL's standard logical decoding plugin, available since version 10. It's the recommended choice.

    * publication.name: Must match the publication you created with CREATE PUBLICATION.

    * slot.name: The name of the replication slot Debezium will create and manage. This is the slot you must monitor.

    * snapshot.mode: This is critical. initial tells Debezium to first perform a consistent snapshot of all tables in the publication and then transition to streaming changes from the WAL. Other options include never (only stream changes from the moment the connector starts) and always (resnapshot on every restart, for testing only).

    * tombstones.on.delete: When a row is deleted, should Debezium emit a tombstone record (a message with a null value)? This is essential for Kafka's log compaction and for downstream consumers to correctly handle deletes.

    * transforms: Debezium's default message format is verbose. The ExtractNewRecordState transform simplifies it immensely, extracting just the after state of the row and placing it in the message value. This makes consumer logic much cleaner.

    POST this JSON to http://localhost:8083/connectors. You should see a 201 Created response. Now, any change you make to the orders or order_items tables will be published to Kafka in near real-time.

    Building a Resilient Read Model Projector Service in Go

    Now we need a service to consume these Kafka messages and update our read model (Elasticsearch). We'll use Go for its performance and robust concurrency features.

    go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    
    	"github.com/segmentio/kafka-go"
    )
    
    // Represents the simplified message format after the Debezium unwrap transform
    type OrderEvent struct {
    	ID          string `json:"id"`
    	CustomerID  string `json:"customer_id"`
    	OrderDate   string `json:"order_date"`
    	TotalAmount float64 `json:"total_amount"`
    	Status      string `json:"status"`
    }
    
    // This would be your Elasticsearch client or other read model repository
    type ReadModelUpdater interface {
    	UpsertOrder(ctx context.Context, order OrderEvent) error
    	DeleteOrder(ctx context.Context, orderID string) error
    }
    
    // A mock implementation for demonstration
    type MockElasticClient struct{}
    
    func (m *MockElasticClient) UpsertOrder(ctx context.Context, order OrderEvent) error {
    	log.Printf("[Elasticsearch] UPSERTING order ID: %s, Status: %s", order.ID, order.Status)
    	// In a real implementation, you'd have the Elasticsearch client logic here.
    	return nil
    }
    
    func (m *MockElasticClient) DeleteOrder(ctx context.Context, orderID string) error {
    	log.Printf("[Elasticsearch] DELETING order ID: %s", orderID)
    	// Real delete logic here.
    	return nil
    }
    
    func main() {
    	brokers := []string{"localhost:9092"}
    	topic := "production_db_server.public.orders"
    	groupID := "elasticsearch-projector-group"
    
    	r := kafka.NewReader(kafka.ReaderConfig{
    		Brokers:        brokers,
    		GroupID:        groupID,
    		Topic:          topic,
    		MinBytes:       10e3, // 10KB
    		MaxBytes:       10e6, // 10MB
    		CommitInterval: 0,    // Manual commit
    	})
    
    	defer r.Close()
    
    	log.Printf("Starting consumer for topic '%s' with group ID '%s'", topic, groupID)
    
    	ctx, cancel := context.WithCancel(context.Background())
    	updater := &MockElasticClient{}
    
    	// Graceful shutdown handling
    	sigchan := make(chan os.Signal, 1)
    	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    	go func() {
    		<-sigchan
    		log.Println("Shutdown signal received, closing consumer...")
    		cancel()
    	}()
    
    	for {
    		m, err := r.FetchMessage(ctx)
    		if err != nil {
    			if err == context.Canceled {
    				break
    			}
    			log.Printf("error fetching message: %v", err)
    			continue
    		}
    
    		log.Printf("Message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
    
    		if err := processMessage(ctx, m, updater); err != nil {
    			log.Printf("Error processing message, will not commit offset: %v", err)
    			// In a real system, you'd have a dead-letter queue (DLQ) strategy here.
    			continue
    		}
    
    		// Commit the offset only after successful processing
    		if err := r.CommitMessages(ctx, m); err != nil {
    			log.Fatalf("failed to commit messages: %v", err)
    		}
    	}
    
    	log.Println("Consumer stopped.")
    }
    
    func processMessage(ctx context.Context, msg kafka.Message, updater ReadModelUpdater) error {
    	// Check for tombstone message (key present, value is null)
    	if msg.Value == nil {
    		var key struct {
    			ID string `json:"id"`
    		}
    		if err := json.Unmarshal(msg.Key, &key); err != nil {
    			return fmt.Errorf("failed to unmarshal tombstone key: %w", err)
    		}
    		return updater.DeleteOrder(ctx, key.ID)
    	}
    
    	// Process a regular create/update message
    	var event OrderEvent
    	if err := json.Unmarshal(msg.Value, &event); err != nil {
    		return fmt.Errorf("failed to unmarshal event value: %w", err)
    	}
    
    	return updater.UpsertOrder(ctx, event)
    }

    Key Features of this Consumer:

  • Graceful Shutdown: It listens for SIGINT and SIGTERM to shut down cleanly, ensuring the current message has a chance to be processed.
  • Manual Commits: We explicitly call r.CommitMessages after a message has been successfully processed. If processMessage fails, the offset is not committed, and Kafka will redeliver the message to another consumer instance (or the same one after a restart). This provides at-least-once processing semantics.
  • Tombstone Handling: It correctly checks for msg.Value == nil to process DELETE operations from the source database, which is critical for keeping the read model in sync.
  • Idempotency: The UpsertOrder logic in the read model should be idempotent. If the same message is delivered twice (which can happen in at-least-once systems), the outcome should be the same. An upsert operation (create if not exists, update if exists) is naturally idempotent.
  • Advanced Patterns and Edge Case Management

    This is where senior engineering diligence separates a proof-of-concept from a production-ready system.

    Edge Case 1: Zero-Downtime Schema Evolution

    What happens when you need to run ALTER TABLE orders ADD COLUMN tracking_number VARCHAR(255);? Debezium is robust enough to handle this. It will detect the schema change, publish a schema change event to an internal topic, and then start publishing new messages that include the tracking_number field.

    However, your Go consumer will fail to unmarshal the new payload if the OrderEvent struct is not updated. This will block processing for the entire partition.

    The Production-Grade Solution:

  • Make schema changes backward-compatible. Adding a new nullable column is safe. Dropping a column or changing a data type is a breaking change.
  • Deploy consumer changes first. Before applying the ALTER TABLE script, deploy a new version of your Go projector service that can handle both the old and new message formats. For adding a field, this means adding the field to your OrderEvent struct with json:"tracking_number,omitempty".
  • Apply the database migration. Run the ALTER TABLE script.
  • Monitor. The old consumers will ignore the new field, and the newly deployed consumers will correctly parse it. Once all consumers are updated, you can make the field required in your struct if necessary.
  • For more complex changes, integrating a Schema Registry (like Confluent Schema Registry) is the gold standard. Debezium can publish events in Avro format, and the consumer can use the schema ID to fetch the correct schema for deserialization, making the entire process much more resilient to change.

    Edge Case 2: Backfilling a New Read Model

    Imagine you need to create a brand new read model (e.g., a Redis cache for order statuses). The existing orders table has 500 million rows. Running the Debezium connector with snapshot.mode: initial will work, but it places a REPEATABLE READ transaction lock on the tables during the snapshot, which can block writes and cause performance degradation on a busy system.

    The Zero-Downtime Backfilling Strategy:

  • Configure a new Debezium connector for the same tables but with snapshot.mode: never. This connector will only stream new changes from the exact moment it starts. Let's call this the "Live Connector".
  • Start the Live Connector. It will create its replication slot and immediately start streaming any new INSERT/UPDATE/DELETE operations to a Kafka topic.
  • Develop a custom backfill script. This script will run a SELECT FROM orders query, but it will do so in batches (e.g., using LIMIT and OFFSET on the primary key). For each batch of rows, it transforms the data into the same message format as Debezium and publishes it to the same* Kafka topic.
  • Run the backfill script. As the script works its way through the historical data, the Live Connector is simultaneously capturing any real-time changes made to those same rows.
  • Reconciliation: Because an upsert operation is idempotent, it doesn't matter if the backfill script processes a row and then the Live Connector immediately sends an update for that same row. The final state will be correct. The last write wins.
  • This approach avoids long-running locks on your production database and allows you to build new read models without any downtime or performance impact.

    Performance Tuning and Benchmarking

    Achieving sub-second latency from database commit to read model visibility is possible but requires tuning.

    * PostgreSQL: Ensure the disk subsystem for WAL files is fast (SSDs are a must). Monitor pg_stat_replication to check for latency between the primary and the standby/consumer. A large write_lag or flush_lag indicates the consumer can't keep up.

    * Debezium/Kafka Connect: You can scale out by increasing tasks.max for a connector, but for the PostgreSQL connector, this is often limited to 1 task due to the nature of a single WAL stream. The primary scaling mechanism is providing sufficient CPU and memory to the Kafka Connect worker instance.

    * Kafka: Ensure your topics have enough partitions to allow for consumer parallelism. If you have high write volume, you may need to partition your Kafka topic (e.g., by customer_id) and run multiple instances of your Go projector service within the same consumer group. Each instance will be assigned a subset of the partitions, processing them in parallel.

    * Consumer: Tune the MinBytes and MaxBytes settings in the Kafka reader. Batch writes to your read model. Instead of UpsertOrder one-by-one, collect 100 messages and send them to Elasticsearch as a single bulk request. This dramatically increases throughput.

    Hypothetical Latency Benchmark:

    StageP99 Latency (ms)
    DB Commit -> WAL Flush< 5
    WAL Read -> Debezium Event< 10
    Debezium -> Kafka Publish< 20
    Kafka -> Consumer Fetch< 50
    Consumer -> Read Model Write< 100
    End-to-End Total< 185 ms

    With a well-tuned system, end-to-end P99 latencies of under 200ms are readily achievable, a significant improvement over batch-oriented or polling-based alternatives.

    Conclusion

    Using PostgreSQL CDC with Debezium is not just another way to implement CQRS projections; it represents a fundamental architectural shift. It decouples your services at the most reliable point possible: the database's transaction log. By moving the responsibility of change notification from the application to the database, you gain atomicity, low latency, and a highly durable event stream by default.

    However, this power comes with responsibility. The operational burden shifts from managing application-level event publishers to meticulously monitoring the database's replication health, especially the state of replication slots. For senior engineers building complex, distributed systems, the trade-off is overwhelmingly positive. You get a system that is more resilient, more scalable, and a more accurate reflection of your true system state, enabling you to build sophisticated, real-time features with confidence.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles