Advanced CDC with Debezium for Bulletproof CQRS Architectures

15 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inherent Consistency Problem in CQRS

For senior engineers building distributed systems, Command Query Responsibility Segregation (CQRS) is a powerful pattern. It allows us to optimize our write-side (command) data model for transactional integrity and our read-side (query) model for high-performance reads. However, this separation introduces our primary challenge: consistency. How do we guarantee that a state change in the command model is reliably and atomically reflected in the query model?

The naive approach is the 'dual write'. In the same service call that handles the command, you write to the command database and then publish an event or directly write to the read database.

java
// ANTI-PATTERN: Dual Write
@Transactional
public void updateProductPrice(UUID productId, BigDecimal newPrice) {
    // 1. Write to the command model (e.g., PostgreSQL)
    Product product = productRepository.findById(productId).orElseThrow();
    product.setPrice(newPrice);
    productRepository.save(product);

    // 2. Publish an event (e.g., to Kafka/RabbitMQ)
    // WHAT HAPPENS IF THIS FAILS? The DB commit has already happened.
    eventPublisher.publish(new ProductPriceUpdatedEvent(productId, newPrice));
}

This pattern is fundamentally flawed. The two operations—the database commit and the message broker publish—are not part of the same atomic transaction. If the database write succeeds but the event publish fails (due to a network partition, broker downtime, etc.), the system is left in an inconsistent state. The command model is updated, but the query models are never notified.

This is where Change Data Capture (CDC) using the database's transaction log becomes the definitive solution. Instead of the application being responsible for publishing events, we treat the database's transaction log as the single source of truth. A tool like Debezium can tail this log, capture committed changes, and reliably publish them as events. This guarantees that an event is published if and only if the transaction was successfully committed.

However, simply pointing Debezium at your primary business tables (products, orders) creates a tight coupling between your internal data model and your public event contracts. Any schema change, even adding a private internal column, becomes a breaking change for event consumers. This is where we introduce a more robust, production-grade pattern: The Transactional Outbox Pattern.

Core Implementation: The Transactional Outbox Pattern

The Transactional Outbox pattern decouples your application from the message publishing process while maintaining atomicity. The logic is simple but powerful:

  • Within the same local database transaction as your primary business logic, you insert an event record into a dedicated outbox table.
  • Debezium is configured to only monitor this outbox table.
  • Debezium captures the new record from the outbox table's transaction log entry and publishes it as a message to a broker like Kafka.
  • Since the business table update and the outbox table insert happen in the same transaction, the operation is atomic. If the transaction fails, nothing is written to the outbox. If it succeeds, the event is guaranteed to be captured.

    1. Database Schema

    First, let's define our outbox table in PostgreSQL. This table will store the events we want to publish.

    sql
    CREATE TABLE outbox (
        id UUID PRIMARY KEY,
        aggregate_type VARCHAR(255) NOT NULL,
        aggregate_id VARCHAR(255) NOT NULL,
        event_type VARCHAR(255) NOT NULL,
        payload JSONB NOT NULL,
        created_at TIMESTAMPTZ DEFAULT NOW()
    );
    
    -- Optional: Index for querying or cleanup tasks
    CREATE INDEX idx_outbox_created_at ON outbox(created_at);
  • id: A unique identifier for the event itself (e.g., a UUID).
  • aggregate_type: The type of the domain aggregate (e.g., 'Product', 'Order').
  • aggregate_id: The unique ID of the aggregate instance that the event pertains to.
  • event_type: A specific string identifying the event (e.g., 'ProductPriceUpdated', 'OrderCreated').
  • payload: The actual event data, stored as JSONB for flexibility and queryability.
  • 2. Atomic Write in the Application

    Now, let's refactor our command handler to use this pattern. Here is a complete example using Java, Spring Boot, and JPA.

    java
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.JsonNode;
    import javax.persistence.*;
    import java.time.Instant;
    import java.util.UUID;
    
    // Outbox Entity
    @Entity
    @Table(name = "outbox")
    public class OutboxEvent {
        @Id
        private UUID id;
    
        @Column(name = "aggregate_type", nullable = false)
        private String aggregateType;
    
        @Column(name = "aggregate_id", nullable = false)
        private String aggregateId;
    
        @Column(name = "event_type", nullable = false)
        private String eventType;
    
        @Column(name = "payload", columnDefinition = "jsonb", nullable = false)
        private String payload;
    
        @Column(name = "created_at", insertable = false, updatable = false)
        private Instant createdAt;
        
        // Constructors, Getters, Setters...
    }
    
    // ProductPriceUpdatedEvent DTO
    public record ProductPriceUpdatedEvent(UUID productId, BigDecimal newPrice, Instant occurredOn) {}
    
    // ProductService implementation
    @Service
    public class ProductService {
    
        private final ProductRepository productRepository;
        private final EntityManager entityManager;
        private final ObjectMapper objectMapper;
    
        public ProductService(ProductRepository productRepository, EntityManager entityManager, ObjectMapper objectMapper) {
            this.productRepository = productRepository;
            this.entityManager = entityManager;
            this.objectMapper = objectMapper;
        }
    
        @Transactional
        public void updateProductPrice(UUID productId, BigDecimal newPrice) {
            // 1. Perform business logic and update the primary aggregate
            Product product = productRepository.findById(productId)
                .orElseThrow(() -> new EntityNotFoundException("Product not found"));
            product.setPrice(newPrice);
            productRepository.save(product);
    
            // 2. Create the event payload
            var eventPayload = new ProductPriceUpdatedEvent(productId, newPrice, Instant.now());
    
            // 3. Create and persist the OutboxEvent within the SAME transaction
            OutboxEvent outboxEvent = new OutboxEvent();
            outboxEvent.setId(UUID.randomUUID());
            outboxEvent.setAggregateType("Product");
            outboxEvent.setAggregateId(productId.toString());
            outboxEvent.setEventType("ProductPriceUpdated");
            try {
                outboxEvent.setPayload(objectMapper.writeValueAsString(eventPayload));
            } catch (JsonProcessingException e) {
                throw new RuntimeException("Error serializing event payload", e);
            }
            
            entityManager.persist(outboxEvent);
        }
    }

    The @Transactional annotation ensures that the productRepository.save(product) and entityManager.persist(outboxEvent) calls are executed within a single, atomic database transaction. The dual-write problem is solved.

    3. Debezium Connector Configuration

    Now, we configure a Debezium PostgreSQL connector to watch only the outbox table. This is a critical step for loose coupling. The connector is deployed to a Kafka Connect cluster.

    json
    {
      "name": "cqrs-outbox-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "mydatabase",
        "database.server.name": "myserver",
        "table.include.list": "public.outbox",
        "topic.prefix": "db-events",
        "heartbeat.interval.ms": "5000",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
      }
    }

    With this configuration, any new row inserted into the public.outbox table will be captured by Debezium and sent to a Kafka topic named db-events.public.outbox.

    Advanced Debezium Transformations for Clean Events

    The default Debezium event format is verbose. It's designed for replication and contains before, after, source, op (operation), and ts_ms fields. Downstream consumers in a CQRS system rarely need this much detail; they just want the event payload itself.

    We can use Kafka Connect's Single Message Transforms (SMTs) to reshape these events before they are written to Kafka. This keeps our consumers clean and unaware of Debezium's internal format.

    Our goal is to transform this raw Debezium message:

    json
    // Raw Debezium Event
    {
      "schema": { ... },
      "payload": {
        "before": null,
        "after": {
          "id": "a1b2c3d4-...",
          "aggregate_type": "Product",
          "aggregate_id": "e5f6g7h8-...",
          "event_type": "ProductPriceUpdated",
          "payload": "{\"productId\":\"e5f6g7h8-...\",\"newPrice\":199.99}"
        },
        "source": { ... },
        "op": "c",
        "ts_ms": 1678886400000
      }
    }

    Into this clean, business-focused event:

    json
    // Desired Kafka Message Value
    {
      "productId": "e5f6g7h8-...",
      "newPrice": 199.99
    }

    With the aggregate_id as the Kafka message key for ordering and the event_type as a Kafka header for routing.

    Here's the advanced connector configuration to achieve this:

    json
    {
      "name": "cqrs-outbox-connector-transformed",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        // ... same database config as before ...
        "table.include.list": "public.outbox",
        "topic.prefix": "ignored-prefix",
    
        // --- Transformation Chain --- 
    
        "transforms": "unwrap,route,key,header",
    
        // 1. Unwrap the Debezium event to get the 'after' state
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "none",
    
        // 2. Extract the aggregate_id and set it as the message key
        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "aggregate_id",
    
        // 3. Promote event metadata to Kafka headers
        "transforms.header.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
        "transforms.header.fields": "id,event_type,aggregate_type",
        "transforms.header.headers": "eventId,eventType,aggregateType",
        "transforms.header.operation": "copy",
    
        // 4. Route the message to a topic based on the aggregate_type
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "(.*)",
        "transforms.route.replacement": "cqrs.${_header.aggregateType}",
    
        // --- Final Cleanup --- 
        // After extracting the payload, we can get rid of the original outbox fields
        "transforms.unwrap.add.fields": "payload",
        "transforms.unwrap.add.fields.prefix": "",
        "transforms.unwrap.delete.fields": "id,aggregate_type,aggregate_id,event_type,payload,created_at"
      }
    }

    Let's break down this transformation chain:

  • unwrap (ExtractNewRecordState): This is the core Debezium SMT. It extracts the after block from the message, effectively removing the Debezium envelope.
  • key (ExtractField$Key): We pull the aggregate_id field from the unwrapped record and set it as the Kafka message key. This is critical for ordering. All events for the same aggregate instance will now go to the same Kafka partition, ensuring they are processed in order by consumers.
  • header (HeaderFrom$Value): We copy important metadata like the unique event id and event_type into Kafka message headers. Consumers can use these headers for routing, filtering, or idempotency without needing to parse the message body.
  • route (RegexRouter): This is a powerful addition. Instead of publishing all events to a single topic, we use the aggregateType header we just created to route events to dedicated topics (e.g., cqrs.Product, cqrs.Order). This is a common and scalable pattern.
  • This refined configuration pipeline produces clean, well-structured, and easily consumable events, pushing the complexity into the integration layer (Kafka Connect) and away from your business services.

    Handling Idempotency and Schema Evolution

    With events flowing, we must address two critical production realities: duplicate messages and changing event structures.

    Idempotent Consumer Design

    Kafka provides at-least-once delivery semantics. This means a consumer might process the same event more than once (e.g., after a consumer group rebalance or a crash/restart). Your read-side projection logic must be idempotent.

    The eventId we placed in the Kafka header is our key to achieving this. The consumer service should track the IDs of events it has already processed.

    Here's a conceptual implementation for a read-side service that projects data into its own database (e.g., Elasticsearch or another PostgreSQL instance).

    sql
    -- In the read-side database
    CREATE TABLE processed_event_ids (
        event_id UUID PRIMARY KEY,
        processed_at TIMESTAMPTZ DEFAULT NOW()
    );
    java
    // Spring Kafka Consumer
    @Service
    public class ProductViewProjection {
    
        private final ProductViewRepository productViewRepository;
        private final ProcessedEventRepository processedEventRepository;
        private final PlatformTransactionManager transactionManager;
    
        // ... constructor ...
    
        @KafkaListener(topics = "cqrs.Product", groupId = "product-view-builder")
        public void handleProductEvent(@Payload String payload, @Header("eventType") String eventType, @Header("eventId") String eventIdStr) {
            UUID eventId = UUID.fromString(eventIdStr);
    
            // Use programmatic transaction management for fine-grained control
            TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
            transactionTemplate.execute(status -> {
                // 1. Check for duplicates
                if (processedEventRepository.existsById(eventId)) {
                    log.warn("Duplicate event received, skipping: {}", eventId);
                    return null; // Already processed
                }
    
                // 2. Process the event based on its type
                switch (eventType) {
                    case "ProductPriceUpdated":
                        // ... deserialize payload and update read model ...
                        break;
                    // ... other event types ...
                }
    
                // 3. Mark the event as processed
                processedEventRepository.save(new ProcessedEvent(eventId));
                
                return null;
            });
        }
    }

    By wrapping the check, the business logic, and the save of the processed ID in a single transaction, we guarantee that the event is processed exactly once from a business perspective. If any part fails, the whole transaction rolls back, and the message will be redelivered for a retry.

    Schema Evolution

    What happens when ProductPriceUpdatedEvent needs a new field, like currency? Since our payload is JSONB, the database write is flexible. The challenge is on the consumer side.

  • Additive Changes: Adding new, optional fields is generally safe. Consumers with older code will simply ignore the new field during deserialization. This is the most common and recommended approach.
  • Breaking Changes: Renaming or removing a field is a breaking change. To handle this, you must version your events. The event_type in the outbox becomes ProductPriceUpdated_v1, and a new event ProductPriceUpdated_v2 is introduced. The application code would publish the V2 event, and consumers can be updated gradually to handle both versions until the V1 event is no longer produced.
  • Schema Registry: For more formal governance, especially in a polyglot environment, using a Schema Registry (like Confluent Schema Registry) with Avro or Protobuf is the gold standard. Debezium has built-in support for these formats. The schema is registered and versioned, and consumers can fetch the correct schema at runtime to deserialize the message, providing strong forward and backward compatibility guarantees.
  • Performance and Operational Considerations

    Running this pattern in production requires attention to the long-term health of the system.

    Outbox Table Bloat and Cleanup

    The outbox table will grow indefinitely if left unchecked. This can degrade write performance and consume significant disk space. We need a reliable cleanup strategy.

    A critical mistake is to delete the record immediately after the transaction commits. Debezium may not have had time to process it yet. The correct approach is a separate, asynchronous cleanup process.

    Strategy 1: Periodic Cleanup Job

    A background job runs periodically (e.g., every hour) and deletes records from the outbox table that are older than a certain threshold (e.g., 7 days).

    sql
    DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '7 days';

    This is simple but requires a safety margin. You must ensure your Debezium connector and Kafka cluster are healthy enough to process events well within this 7-day window. Monitoring Debezium's lag is essential.

    Strategy 2: Partitioning (Advanced)

    For very high-throughput systems, DELETE operations can be costly. A more performant approach is to use PostgreSQL's native table partitioning.

    sql
    -- Partition the outbox table by day
    CREATE TABLE outbox (
        -- ... columns ...
    ) PARTITION BY RANGE (created_at);
    
    -- Create partitions for the next few days
    CREATE TABLE outbox_2023_03_15 PARTITION OF outbox FOR VALUES FROM ('2023-03-15') TO ('2023-03-16');
    CREATE TABLE outbox_2023_03_16 PARTITION OF outbox FOR VALUES FROM ('2023-03-16') TO ('2023-03-17');

    A maintenance script can then create new partitions for future days and simply DROP old partitions, which is a near-instantaneous metadata-only operation.

    sql
    DROP TABLE outbox_2023_03_08;

    Debezium Connector Tuning

  • max.batch.size: Controls the maximum number of records in a batch sent to Kafka. Increasing this (e.g., to 8192) can improve throughput at the cost of higher memory usage on the Connect worker.
  • poll.interval.ms: How often the connector polls for changes. Decreasing this (e.g., to 100) reduces latency but increases CPU load on the database and the connector.
  • Monitoring: Use JMX to monitor key Debezium metrics. MilliSecondsSinceLastEvent is crucial for detecting a stuck connector. QueueRemainingCapacity and MaxQueueSizeInBytes help monitor backpressure.
  • Conclusion: Beyond Dual Writes

    By combining CQRS with the Transactional Outbox pattern and Debezium, we build systems that are not only scalable and performant but also resilient and consistent by design. This architecture elevates the database transaction log to its rightful place as the immutable source of truth for state changes.

    We have moved from the fragile dual-write anti-pattern to a robust, observable, and extensible event-driven architecture. The implementation details—from atomic writes and advanced SMTs to idempotent consumers and operational cleanup—are what separate a proof-of-concept from a bulletproof, production-ready system. This pattern provides a solid foundation for building complex microservices that can evolve safely and maintain data integrity across distributed boundaries.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles