Kafka CQRS/ES: Idempotent Consumers & Exactly-Once Semantics

19 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inherent Consistency Challenge in Distributed CQRS/ES

In any non-trivial CQRS/ES system, the fundamental challenge is synchronizing the write model (command side) with one or more read models (query side). The decoupling achieved via an event bus like Apache Kafka is powerful, but it introduces the classic distributed systems problem of maintaining consistency across service boundaries. The default delivery guarantee in Kafka, at-least-once, means that under certain failure scenarios—consumer crashes after processing but before committing an offset, network partitions, consumer group rebalances—a message can be redelivered.

For a simple logging system, a duplicate log entry might be an annoyance. For a financial ledger or an e-commerce order processor, processing an OrderCreated or PaymentProcessed event twice is catastrophic. It leads to data corruption, incorrect aggregates, and a loss of system trust.

While at-most-once delivery avoids duplicates, it does so at the cost of potential data loss, which is unacceptable for most business-critical systems. The holy grail is exactly-once semantics (EOS)—the guarantee that each message is processed, and its side effects are applied, precisely one time.

Achieving EOS isn't a single Kafka setting you enable; it's an end-to-end architectural property. It requires a three-pronged approach:

  • Atomic Write-Side Operations: Ensuring that the state change in the command model's database and the event publication are atomic.
  • Duplicate-Free Publishing: Guaranteeing that producer retries don't introduce duplicate messages into Kafka.
  • Idempotent Consumption: Designing consumers that can safely re-process the same message without causing duplicate side effects in the read model.
  • This article will dissect production-ready patterns for all three, focusing on the most complex and critical piece: the idempotent consumer.


    1. The Transactional Outbox Pattern: Atomicity at the Source

    Before an event even reaches Kafka, we must solve the dual-write problem. A naive implementation might look like this:

    java
    // ANTI-PATTERN: DO NOT USE
    @Transactional
    public void processOrder(CreateOrderCommand command) {
        // 1. Save to write-model database
        Order order = new Order(command.getDetails());
        orderRepository.save(order);
    
        // 2. Publish to Kafka
        OrderCreatedEvent event = new OrderCreatedEvent(order.getId());
        kafkaTemplate.send("orders", event);
    }

    If the application crashes between the database commit and the Kafka send(), the order is saved, but the event is lost forever. The read models will never know this order exists. If the send() succeeds but the transaction commit fails, you have an event for an order that doesn't exist.

    The solution is the Transactional Outbox pattern. We create an outbox table within the same database as our domain entities. The event is persisted to this table as part of the same local database transaction as the state change.

    Schema Definition (PostgreSQL)

    sql
    CREATE TABLE orders (
        id UUID PRIMARY KEY,
        customer_id UUID NOT NULL,
        status VARCHAR(50) NOT NULL,
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    
    CREATE TABLE outbox_events (
        id UUID PRIMARY KEY,
        aggregate_type VARCHAR(255) NOT NULL, 
        aggregate_id VARCHAR(255) NOT NULL,
        event_type VARCHAR(255) NOT NULL,
        payload JSONB NOT NULL,
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );

    Atomic Command Handler Implementation (Java with Spring Data JPA)

    The command handler now writes to both tables atomically.

    java
    @Service
    public class OrderCommandHandler {
    
        private final OrderRepository orderRepository;
        private final OutboxEventRepository outboxEventRepository;
        private final ObjectMapper objectMapper; // Jackson ObjectMapper
    
        // ... constructor ...
    
        @Transactional
        public void handleCreateOrderCommand(CreateOrderCommand command) {
            Order order = new Order(command.getCustomerId());
            orderRepository.save(order);
    
            OrderCreatedEvent event = new OrderCreatedEvent(order.getId(), order.getCustomerId());
            saveToOutbox(order.getId(), "Order", "OrderCreated", event);
        }
    
        private void saveToOutbox(UUID aggregateId, String aggregateType, String eventType, Object payload) {
            try {
                JsonNode payloadJson = objectMapper.valueToTree(payload);
                OutboxEvent outboxEvent = new OutboxEvent(
                    aggregateType,
                    aggregateId.toString(),
                    eventType,
                    payloadJson
                );
                outboxEventRepository.save(outboxEvent);
            } catch (Exception e) {
                throw new RuntimeException("Failed to serialize event payload for outbox", e);
            }
        }
    }

    Now, the order and the outbox_event are committed or rolled back together. We have guaranteed that for every state change, an event is reliably persisted. The next step is to move this event from the outbox table to Kafka.

    A separate, asynchronous process, often called a relay or poller, is responsible for this. This could be implemented using CDC (Change Data Capture) tools like Debezium or a custom application poller.


    2. Kafka's EOS Features: Idempotent & Transactional Producers

    The relay process reads from the outbox_events table and publishes to Kafka. This relay itself can fail. If it publishes a message and then crashes before marking the outbox event as processed, it will re-publish the same event on restart. Kafka provides two key features to handle this.

    2.1. The Idempotent Producer

    By setting enable.idempotence=true in the producer configuration, you prevent duplicates caused by producer-side retries. Kafka achieves this by assigning each producer a unique Producer ID (PID) and a sequence number to each message sent to a specific topic-partition. If the broker receives a message with a sequence number it has already seen for that PID, it discards it.

    Spring Boot application.yml Configuration:

    yaml
    spring:
      kafka:
        producer:
          acks: all
          retries: 3 # Or a higher value
          properties:
            enable.idempotence: true
            # For Kafka < 3.0, these are set automatically by enable.idempotence
            # max.in.flight.requests.per.connection: 5 

    This configuration ensures that network errors causing retries won't create duplicate messages in Kafka. This is a crucial first step.

    2.2. The Transactional Producer

    While the idempotent producer handles retries, the transactional producer handles atomicity for writes to multiple partitions. When our outbox poller reads a batch of events, they might be destined for different topics or partitions. We want the entire batch to either succeed or fail together.

    Spring Boot application.yml Configuration:

    yaml
    spring:
      kafka:
        producer:
          # ... other properties ...
          transaction-id-prefix: tx-outbox-poller-

    The transaction-id-prefix allows Spring to create a unique but stable transactional.id for each producer instance.

    Outbox Poller with Transactional Kafka Producer:

    java
    @Component
    public class OutboxPoller {
    
        private final OutboxEventRepository outboxRepository;
        private final KafkaTemplate<String, JsonNode> kafkaTemplate;
    
        // ... constructor ...
    
        @Scheduled(fixedDelay = 1000)
        @Transactional(transactionManager = "kafkaTransactionManager") // Use Kafka TM
        public void pollAndPublish() {
            List<OutboxEvent> events = outboxRepository.findTop100ByOrderByCreatedAtAsc();
            if (events.isEmpty()) {
                return;
            }
            
            // The executeInTransaction block ensures all sends are part of one transaction.
            kafkaTemplate.executeInTransaction(kt -> {
                events.forEach(event -> {
                    // Key by aggregate ID for partitioning consistency
                    kt.send("events."+event.getAggregateType().toLowerCase(), event.getAggregateId(), event.getPayload());
                });
                return null;
            });
            
            // If the Kafka transaction commits successfully, delete the events from the outbox
            // This part runs OUTSIDE the Kafka transaction but within its own DB transaction if the method is @Transactional
            // To truly bind them, you need a ChainedTransactionManager (see advanced section)
            // For now, let's assume a simple delete after successful send.
            deleteOutboxEvents(events);
        }
        
        @Transactional // A new, separate DB transaction
        public void deleteOutboxEvents(List<OutboxEvent> events) {
            outboxRepository.deleteAllInBatch(events);
        }
    }

    Critical Point: The code above has a subtle flaw. The Kafka transaction commit and the outbox database delete are not atomic. If the app crashes after the Kafka commit but before the DB delete, the poller will restart and re-publish the same events. However, because we enabled enable.idempotence=true, the Kafka broker will discard these duplicates! This combination provides a robust at-least-once delivery from the DB to Kafka, but with effective duplicate suppression on the broker side.


    3. Building Idempotent Consumers: The Final and Hardest Link

    Now that events are in Kafka with high reliability, we must ensure the consumer can process them exactly once. The consumer's task is to update a read model (e.g., a PostgreSQL table, an Elasticsearch document) and then commit its Kafka offset. The failure window is between these two actions.

    Pattern 1: Idempotency Key Tracking in the Read-Model Database

    This is the most robust and universally applicable pattern. We use the unique ID of our OutboxEvent as an idempotency key. The consumer maintains a record of the event IDs it has already successfully processed.

    Schema Augmentation (PostgreSQL):

    sql
    -- Table to track processed event IDs
    CREATE TABLE processed_event_ids (
        event_id UUID PRIMARY KEY,
        processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    
    -- The read model table
    CREATE TABLE order_summaries (
        order_id UUID PRIMARY KEY,
        customer_id UUID NOT NULL,
        status VARCHAR(50) NOT NULL,
        item_count INT NOT NULL DEFAULT 0,
        total_amount NUMERIC(10, 2) NOT NULL DEFAULT 0.00
    );

    Idempotent Consumer Implementation (Spring Kafka):

    The consumer logic must wrap the read-model update and the idempotency key insertion in a single database transaction.

    java
    @Service
    public class OrderSummaryProjection {
    
        private final OrderSummaryRepository summaryRepository;
        private final ProcessedEventIdRepository processedEventRepository;
        private final DataSource dataSource; // Inject DataSource for manual transaction control
    
        // ... constructor ...
    
        @KafkaListener(topics = "events.order", groupId = "order-summary-projector")
        public void handleOrderEvent(ConsumerRecord<String, OrderEvent> record) {
            OrderEvent event = record.value();
    
            // Using manual transaction control for clarity and precision
            Connection connection = DataSourceUtils.getConnection(dataSource);
            try (connection) {
                connection.setAutoCommit(false);
    
                // 1. Check for duplicate event within the transaction
                if (processedEventRepository.existsById(event.getEventId())) {
                    log.warn("Duplicate event received and skipped: {}", event.getEventId());
                    connection.commit(); // Commit the empty transaction
                    return;
                }
    
                // 2. Process the event (update the read model)
                if (event instanceof OrderCreatedEvent) {
                    createOrderSummary((OrderCreatedEvent) event);
                } else if (event instanceof ItemAddedToOrderEvent) {
                    updateOrderSummary((ItemAddedToOrderEvent) event);
                }
                // ... other event types ...
    
                // 3. Record the event ID as processed
                processedEventRepository.save(new ProcessedEventId(event.getEventId()));
    
                // 4. Commit the entire unit of work
                connection.commit();
    
            } catch (SQLException e) {
                try {
                    connection.rollback();
                } catch (SQLException ex) {
                    // Log rollback failure
                }
                // Let Spring Kafka's error handler deal with it (e.g., retry, DLQ)
                throw new RuntimeException("Failed to process event transactionally", e);
            }
        }
        
        // ... implementation of createOrderSummary and updateOrderSummary ...
    }

    With this pattern, if the consumer crashes after the DB commit but before the Kafka offset commit, it will re-receive the message. This time, the existsById check will return true, the logic will be skipped, and the empty transaction will be committed. The read model is safe from duplicates.

    Performance Consideration: The processed_event_ids table will grow indefinitely. For high-throughput systems, this is a concern. You must implement a cleanup strategy, such as a periodic job that deletes IDs older than the message retention period of your Kafka topic. Partitioning this table by date can also significantly improve performance.

    Pattern 2: Leveraging Read-Model Constraints (Conditional Idempotency)

    In some simpler cases, you can achieve idempotency without a separate tracking table by relying on the database's own constraints.

  • On Creation: If your OrderCreatedEvent handler simply does an INSERT into the order_summaries table where order_id is the primary key, a duplicate event will cause a primary key violation. You can catch this specific exception and safely ignore it.
  • On Update: An UPDATE order_summaries SET status = 'SHIPPED' WHERE order_id = ? is naturally idempotent. Running it ten times has the same result as running it once.
  • Example using INSERT ... ON CONFLICT (PostgreSQL):

    java
    // Inside a handler for OrderCreatedEvent
    @Transactional
    public void createOrderSummary(OrderCreatedEvent event) {
        // This query is idempotent in PostgreSQL
        jdbcTemplate.update(
            "INSERT INTO order_summaries (order_id, customer_id, status) VALUES (?, ?, 'CREATED') " +
            "ON CONFLICT (order_id) DO NOTHING",
            event.getOrderId(), event.getCustomerId()
        );
    }

    Limitations: This pattern is brittle. It breaks down as soon as you have non-idempotent operations. For example, an ItemAddedToOrderEvent handler that executes UPDATE order_summaries SET item_count = item_count + 1 is not idempotent. Processing it twice would incorrectly increment the count. This pattern should only be used when all operations on a given read model are provably idempotent.

    Pattern 3: Kafka Read-Process-Write Transactions (The Gold Standard)

    This is the most advanced and seamless pattern, atomizing the read model DB write and the Kafka offset commit into a single transaction. It requires a compatible transaction manager.

    Spring Boot application.yml for Chained Transactions:

    yaml
    spring:
      kafka:
        consumer:
          group-id: order-summary-projector
          auto-offset-reset: earliest
          enable-auto-commit: false # CRITICAL: manual offset management
          isolation-level: read_committed # Only read non-transactional or committed messages
        listener:
          ack-mode: RECORD # Process one at a time
      # This is the magic part
      # We define a transaction manager that chains the Kafka and JPA (database) transaction managers.
      # The Kafka transaction will be started first, and the JPA transaction will join it.
      main:
        allow-bean-definition-overriding: true # Needed for some Spring Boot versions

    Transaction Manager Configuration (Java):

    java
    @Configuration
    public class TransactionConfig {
    
        @Bean
        @Primary
        public ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager(
                KafkaTransactionManager<Object, Object> kafkaTransactionManager,
                JpaTransactionManager jpaTransactionManager) {
            return new ChainedKafkaTransactionManager<>(jpaTransactionManager, kafkaTransactionManager);
        }
    }

    Transactional Consumer Implementation:

    The listener code becomes much simpler. The @Transactional annotation now orchestrates everything.

    java
    @Service
    public class TransactionalOrderSummaryProjection {
    
        private final OrderSummaryRepository summaryRepository;
    
        // ... constructor ...
    
        @KafkaListener(topics = "events.order", groupId = "order-summary-projector")
        // Use the name of our chained transaction manager bean
        @Transactional("chainedKafkaTransactionManager")
        public void handleOrderEvent(ConsumerRecord<String, OrderEvent> record) {
            OrderEvent event = record.value();
    
            // No manual checks needed!
            // The entire method is one atomic operation.
            // If it completes, the DB change is committed AND the Kafka offset is committed.
            // If it fails, the DB change is rolled back AND the offset is NOT committed.
            // The message will be redelivered for another attempt.
    
            if (event instanceof OrderCreatedEvent) {
                // ... create logic ...
            } else if (event instanceof ItemAddedToOrderEvent) {
                // ... update logic, e.g., incrementing a counter ...
            }
        }
    }

    This pattern is the cleanest implementation of EOS. However, it introduces transactional overhead and requires careful configuration. It's the best choice for systems where correctness is paramount and the slight performance cost of distributed transactions is acceptable.


    4. Advanced Edge Cases & Production Considerations

    Implementing these patterns in production requires thinking about failure modes.

    * Poison Pill Messages: What if a message is malformed or triggers a bug that causes the consumer to crash every time? Without intervention, it will be retried indefinitely, blocking the partition. You must configure a Dead Letter Queue (DLQ). Spring Kafka makes this easy with a SeekToCurrentErrorHandler and a DeadLetterPublishingRecoverer.

    * Outbox Poller Scaling & Contention: If you run multiple instances of your outbox poller service for high availability, they will try to grab the same rows from the outbox_events table. This can lead to wasted work and transactional conflicts. The solution is to use pessimistic locking at the database level.

    PostgreSQL SELECT ... FOR UPDATE SKIP LOCKED:

    sql
        -- This query allows multiple pollers to safely grab unique batches of events.
        SELECT * FROM outbox_events ORDER BY created_at ASC LIMIT 100 FOR UPDATE SKIP LOCKED;

    This must be implemented using a native query in your repository layer. Each poller instance will lock the rows it selects, and other instances will simply skip those locked rows and select the next available ones.

    * Consumer Group Rebalancing: During a rebalance, partitions are reassigned between consumers. If a consumer is in the middle of a long-running transaction when its partition is revoked, this can cause issues. Kafka's transactional API includes fencing logic to handle this. When a new consumer starts its transaction for a partition, it fences off the old consumer (with the same transactional.id), causing any of its pending transactions to be aborted. This prevents "zombie" consumers from committing work.

    * Performance Overhead: Transactions are not free. The idempotency key check (Pattern 1) adds at least one SELECT to every message processing flow. The chained transaction manager (Pattern 3) adds coordination overhead. You must benchmark your specific use case to understand the latency impact and determine if it meets your SLOs. For many systems, an extra 5-10ms of latency is a small price to pay for guaranteed data correctness.

    Conclusion

    Exactly-once semantics in a distributed system is not a feature to be enabled but an architecture to be designed. By combining the Transactional Outbox pattern for atomic writes, Kafka's idempotent and transactional producer features for reliable publishing, and a robust idempotent consumer strategy, you can build event-driven CQRS/ES systems that are resilient to failures and maintain strong data consistency.

    For most use cases, the Idempotency Key Tracking pattern provides the best balance of robustness, performance, and implementation simplicity. For systems requiring the absolute strongest guarantees and where non-idempotent operations are common, the Chained Transaction Manager pattern, despite its complexity, is the superior choice. Never assume your system is correct by default; prove it with architecture.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles