Implementing Idempotency for Exactly-Once in Kafka Event Sourcing

21 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Illusion of 'Exactly-Once' in Distributed Systems

In the world of distributed systems, particularly those built on messaging platforms like Apache Kafka, the term "Exactly-Once Semantics" (EoS) is both a coveted goal and a source of profound confusion. Senior engineers know that true exactly-once delivery is a physical impossibility in the face of network partitions. What Kafka offers with its processing.guarantee=exactly_once configuration (since KIP-98) is a powerful primitive that guarantees a message from a topic partition will be processed and its output written to an output partition exactly once, transactionally. This is a monumental achievement for stream processing within the Kafka ecosystem.

However, the guarantee ends where your business logic's side effects begin. Kafka ensures the read-process-write cycle is atomic, but it cannot magically make your database writes, API calls, or cache updates idempotent. If your consumer process crashes after successfully calling a payment gateway but before committing its Kafka offset, a simple restart will cause it to re-process the same message and potentially double-charge a customer.

This is the critical gap where application-level idempotency becomes paramount. True end-to-end EoS is not a configuration flag; it's an architectural pattern that you, the engineer, must design and implement. This article provides a deep, implementation-focused exploration of production-grade patterns for building idempotent consumers in a Kafka-based event sourcing architecture.

We will dissect:

  • The Idempotent Receiver Pattern: The foundational concept for handling duplicate messages.
  • High-Performance Deduplication Strategies: Using Redis for speed and PostgreSQL/Cassandra for transactional consistency.
  • Advanced Edge Case Handling: Managing out-of-order events and stateful processing.
  • The Outbox Pattern: Ensuring idempotency for interactions with external, non-transactional systems.
  • This is not a theoretical overview. We will build and analyze complete, production-ready code examples that you can adapt for your own high-throughput, mission-critical services.


    Pattern 1: The Idempotent Receiver with a High-Speed Cache (Redis)

    The most direct approach to idempotency is the "Idempotent Receiver" pattern. The core idea is simple: every event carries a unique identifier (an idempotency key). The consumer maintains a record of the keys it has already processed. Before executing its business logic, it checks if the incoming event's key has been seen. If so, it skips the logic and acknowledges the message. If not, it processes the event and then records the key.

    Choosing an Idempotency Key:

    An effective idempotency key must be unique and deterministic. Good candidates include:

    • A UUID generated by the producer and placed in the event header.
  • A composite key of business identifiers (e.g., tenant_id:order_id:event_type).
  • For event-sourced aggregates, a combination of aggregate_id and sequence_number.
  • For our first implementation, we'll use Redis as our deduplication store due to its exceptional performance for key-value lookups.

    Naive Redis Implementation (and its critical flaw)

    A first attempt might look like this:

    java
    // WARNING: THIS IMPLEMENTATION IS FLAWED AND FOR ILLUSTRATIVE PURPOSES ONLY
    public class NaiveRedisIdempotencyConsumer {
    
        private final Jedis jedis; // Or any Redis client
        private final KafkaConsumer<String, OrderEvent> consumer;
    
        // ... constructor ...
    
        public void processMessages() {
            while (true) {
                ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, OrderEvent> record : records) {
                    String idempotencyKey = record.headers().lastHeader("idempotency-key").toString();
    
                    // 1. Check for duplicate
                    if (jedis.exists(idempotencyKey)) {
                        System.out.println("Duplicate message detected: " + idempotencyKey);
                        continue; // Skip processing
                    }
    
                    // 2. Process the message (e.g., write to a database)
                    processOrder(record.value());
    
                    // 3. Mark as processed
                    jedis.setex(idempotencyKey, 3600, "processed"); // Set with 1-hour TTL
                }
                consumer.commitSync(); // Commit offsets after batch
            }
        }
        // ...
    }

    The Race Condition: This code is vulnerable to a critical failure mode. Consider this sequence:

  • The processOrder() method successfully writes to the database.
  • The application crashes before jedis.setex() is called.
    • Upon restart, the consumer re-fetches the same message because its offset was never committed.
  • The jedis.exists() check returns false because the key was never written.
    • The business logic runs a second time, creating a duplicate record.

    This demonstrates that the business logic operation and the idempotency key recording must be atomic. Redis, being an external system, cannot participate in a transaction with your primary database. This leads us to a more robust pattern.


    Pattern 2: Transactional Idempotency with a Relational Database

    To achieve atomicity, the idempotency check must occur within the same transaction as the primary business logic. This is the most reliable way to ensure exactly-once processing when your state is stored in a transactional database like PostgreSQL or MySQL.

    The pattern involves creating a dedicated table to store processed message IDs.

    Schema for Processed Events Table (PostgreSQL):

    sql
    CREATE TABLE processed_events (
        idempotency_key VARCHAR(255) PRIMARY KEY,
        processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        consumer_group_id VARCHAR(255) NOT NULL
    );

    By including consumer_group_id, you allow different consumer groups (e.g., one for order processing, another for analytics) to process the same event stream independently without interfering with each other's idempotency checks.

    Production-Grade Implementation with Spring Boot & JPA

    Here is a complete, robust implementation using Spring Kafka and Spring Data JPA. This pattern effectively creates a manual two-phase commit between your database and the Kafka broker.

    java
    // Service Layer: Orchestrates the logic
    @Service
    @Transactional // This is crucial!
    public class OrderProcessingService {
    
        private final ProcessedEventRepository processedEventRepository;
        private final OrderRepository orderRepository;
    
        public OrderProcessingService(ProcessedEventRepository processedEventRepository, OrderRepository orderRepository) {
            this.processedEventRepository = processedEventRepository;
            this.orderRepository = orderRepository;
        }
    
        public void handleOrderCreatedEvent(OrderCreatedEvent event, String idempotencyKey, String consumerGroupId) {
            // 1. Check for duplicate within the current transaction
            if (processedEventRepository.existsById(idempotencyKey)) {
                log.warn("Duplicate event detected, skipping. Key: {}", idempotencyKey);
                return; // Already processed, transaction will commit, no harm done.
            }
    
            // 2. Execute business logic
            Order newOrder = new Order();
            newOrder.setId(event.getOrderId());
            newOrder.setAmount(event.getAmount());
            newOrder.setStatus("CREATED");
            orderRepository.save(newOrder);
    
            // 3. Record the event as processed within the same transaction
            ProcessedEvent processedEvent = new ProcessedEvent(idempotencyKey, consumerGroupId);
            processedEventRepository.save(processedEvent);
            
            // The @Transactional annotation will ensure both saves and the check are atomic.
            // If anything fails, the entire transaction rolls back.
        }
    }
    
    // Kafka Listener: Manages consumption and offset commits
    @Component
    public class OrderEventListener {
    
        private final OrderProcessingService orderProcessingService;
    
        public OrderEventListener(OrderProcessingService orderProcessingService) {
            this.orderProcessingService = orderProcessingService;
        }
    
        @KafkaListener(topics = "orders", groupId = "order-processing-service", containerFactory = "kafkaListenerContainerFactory")
        public void listen(ConsumerRecord<String, OrderCreatedEvent> record, Acknowledgment acknowledgment) {
            try {
                Header idempotencyHeader = record.headers().lastHeader("idempotency-key");
                if (idempotencyHeader == null) {
                    throw new IllegalArgumentException("Idempotency key header is missing");
                }
                String idempotencyKey = new String(idempotencyHeader.value());
    
                // This method call is wrapped in a transaction by Spring
                orderProcessingService.handleOrderCreatedEvent(record.value(), idempotencyKey, "order-processing-service");
    
                // 4. Only acknowledge (commit offset) after the transaction succeeds
                acknowledgment.acknowledge();
    
            } catch (Exception e) {
                log.error("Error processing order event. Will not acknowledge.", e);
                // Do not acknowledge. The message will be redelivered upon next poll.
            }
        }
    }

    Configuration Details:

    To make this work, you must disable Kafka's auto-commit and use manual acknowledgment.

    yaml
    # application.yml
    spring:
      kafka:
        consumer:
          group-id: order-processing-service
          auto-offset-reset: earliest
          enable-auto-commit: false # CRITICAL: Disable auto-commit
        listener:
          ack-mode: MANUAL_IMMEDIATE # CRITICAL: Enable manual acknowledgment

    Why This Pattern is Robust:

  • Atomicity: The @Transactional annotation ensures that the check for the key, the creation of the order, and the insertion of the key into processed_events all succeed or fail together. There is no window for a crash to cause inconsistency.
  • Failure Recovery: If the process crashes after the database transaction commits but before acknowledgment.acknowledge() is called, the message will be redelivered. However, on the next attempt, processedEventRepository.existsById(idempotencyKey) will return true, the logic will be skipped, and the offset will finally be committed. Exactly-once is achieved.
  • Performance Consideration:

    This pattern introduces a write operation to your primary database for every message. For extremely high-throughput systems (tens of thousands of messages per second), this can become a bottleneck. The primary key lookup on processed_events is fast, but the write contention can be an issue. For such scenarios, we turn to distributed databases designed for high write loads.


    Pattern 3: Scaling Idempotency with a Distributed Datastore (Cassandra)

    When write throughput is the primary concern, a NoSQL database like Apache Cassandra or ScyllaDB is an excellent choice for a deduplication store. Their architecture is optimized for high-velocity writes and linear scalability.

    Schema for Processed Events Table (Cassandra/CQL):

    cql
    CREATE TABLE processed_events (
        idempotency_key text PRIMARY KEY,
        processed_at timestamp,
        consumer_id text
    ) WITH default_time_to_live = 2592000; -- Optional: 30-day TTL

    Here, the idempotency_key is the partition key, ensuring that checks for a given key are routed to a specific node, making lookups extremely fast.

    Implementation with Lightweight Transactions (LWT)

    Cassandra offers a feature called Lightweight Transactions (or Compare-and-Set) that allows for atomic read-modify-write operations, perfect for our use case.

    java
    // Service using DataStax Java Driver for Cassandra
    @Service
    public class CassandraDeduplicationService {
    
        private final CqlSession session;
        private PreparedStatement insertStatement;
    
        public CassandraDeduplicationService(CqlSession session) {
            this.session = session;
            this.insertStatement = session.prepare(
                "INSERT INTO processed_events (idempotency_key, processed_at, consumer_id) VALUES (?, ?, ?) IF NOT EXISTS"
            );
        }
    
        /**
         * Attempts to mark an event as processed.
         * @return true if the event was processed for the first time, false if it's a duplicate.
         */
        public boolean markAsProcessed(String idempotencyKey, String consumerId) {
            BoundStatement bound = insertStatement.bind(idempotencyKey, Instant.now(), consumerId);
            ResultSet rs = session.execute(bound);
            Row row = rs.one();
            return row.getBoolean("[applied]");
        }
    }
    
    // Updated Kafka Listener
    @Component
    public class OrderEventListenerWithCassandra {
        // ... dependencies ...
    
        @KafkaListener(topics = "orders", groupId = "order-processing-service-v2")
        public void listen(ConsumerRecord<String, OrderCreatedEvent> record, Acknowledgment acknowledgment) {
            String idempotencyKey = ...; // Extract key
    
            // 1. Attempt to claim the message idempotency key
            boolean isFirstTime = cassandraDeduplicationService.markAsProcessed(idempotencyKey, "order-processing-service-v2");
    
            if (isFirstTime) {
                try {
                    // 2. Execute business logic (e.g., write to primary datastore)
                    orderProcessingService.createOrder(record.value());
    
                    // 3. Commit Kafka offset
                    acknowledgment.acknowledge();
                } catch (Exception e) {
                    log.error("Business logic failed for a new event. Idempotency key is now 'poisoned'. Manual intervention required. Key: {}", idempotencyKey, e);
                    // DO NOT acknowledge. The key is now in Cassandra, but the work failed.
                    // This is a critical failure scenario to monitor.
                }
            } else {
                log.warn("Duplicate event detected by Cassandra. Skipping. Key: {}", idempotencyKey);
                // It's a duplicate, so we can safely acknowledge.
                acknowledgment.acknowledge();
            }
        }
    }

    The "Poison Pill" Edge Case:

    This Cassandra-based pattern introduces a new, subtle, and critical edge case. Look at the catch block.

  • markAsProcessed succeeds. The key is now in Cassandra.
  • orderProcessingService.createOrder() fails (e.g., the primary database is down).
    • The Kafka offset is not acknowledged.
  • On redelivery, markAsProcessed will now return false, and the message will be permanently skipped, even though the business logic never completed.
  • This is a "poison pill" scenario. The idempotency key has been consumed, but the work is not done.

    Solution: The Claim-Check Pattern with an Outbox

    To solve this, you separate the claiming of the message from the processing. The business logic should not be executed directly in the listener. Instead, the listener's only job is to atomically write the event to an "inbox" or "outbox" table in your primary transactional datastore, using the same robust pattern as in Section 2. A separate process then works through the outbox, ensuring the actual side effects (like calling external APIs) can be retried independently of Kafka consumption. We will explore this in detail in the final section.

    Performance of LWT:

    Cassandra's LWTs are more expensive than standard writes because they involve a Paxos-based consensus round. While still highly performant, they can introduce latency. For systems that can tolerate a minuscule risk of duplicates under rare failure conditions, a non-LWT read-before-write approach can be used, but INSERT ... IF NOT EXISTS is the safest bet for strong consistency.


    Advanced Topic: Handling Out-of-Order Events in Stateful Consumers

    Idempotency solves the duplicate problem, but in event sourcing, the order of events for a given aggregate is often critical. For example, an OrderUpdated event must not be processed before the OrderCreated event.

    Kafka only guarantees ordering within a single partition. If you partition your orders topic by orderId, all events for a given order will go to the same partition and be consumed in order. However, consumer rebalancing or producer-side retries can still introduce ordering issues.

    A robust stateful consumer must track the sequence number of the last processed event for each aggregate.

    Pattern: Aggregate Sequence Tracking

  • Events: Each event must contain the aggregate ID (e.g., orderId) and a monotonically increasing sequence number or version.
  • json
        { "orderId": "123", "sequenceNumber": 1, "type": "OrderCreated", ... }
        { "orderId": "123", "sequenceNumber": 2, "type": "OrderUpdated", ... }
  • State Store: The consumer maintains a state store (e.g., RocksDB, Redis, or a database table) mapping aggregateId -> lastProcessedSequence.
  • Implementation Logic:

    java
    // Pseudocode for a stateful consumer
    public void processEvent(Event event) {
        String aggregateId = event.getAggregateId();
        long currentSequence = event.getSequenceNumber();
    
        // 1. Fetch the last processed sequence for this aggregate
        long lastProcessedSequence = stateStore.get(aggregateId);
    
        // 2. Idempotency Check: If we've seen this or a later event, skip.
        if (currentSequence <= lastProcessedSequence) {
            log.warn("Stale or duplicate event for aggregate {}: received sequence {}, last processed was {}. Skipping.", 
                     aggregateId, currentSequence, lastProcessedSequence);
            return;
        }
    
        // 3. Order Check: If there's a gap, we have an out-of-order event.
        if (currentSequence > lastProcessedSequence + 1) {
            log.error("Out-of-order event for aggregate {}: received sequence {}, expected {}. Buffering or failing.", 
                      aggregateId, currentSequence, lastProcessedSequence + 1);
            // Strategy A: Buffer the event in a temporary store and retry later.
            // Strategy B: Send to a Dead Letter Queue (DLQ) for manual inspection.
            // Strategy C: Fail loudly, stop consumption on this partition until resolved.
            throw new IllegalStateException("Out-of-order event detected.");
        }
    
        // 4. Process the event and update the state atomically
        // This should be in a transaction!
        beginTransaction();
        try {
            applyBusinessLogic(event);
            stateStore.put(aggregateId, currentSequence);
            commitTransaction();
        } catch (Exception e) {
            rollbackTransaction();
            throw e;
        }
    }

    This pattern combines idempotency (the <= check) and ordering validation (the > check) to build highly resilient, stateful projections from an event stream.


    The Final Mile: Idempotency with External Systems via the Outbox Pattern

    Our most complex challenge is ensuring exactly-once side effects with external systems that are not transactional, like a third-party payment gateway or an email service.

    Directly calling an API from your Kafka consumer is brittle. If the API call succeeds but your process crashes before the offset commit, the call will be repeated on restart.

    The Transactional Outbox Pattern elegantly solves this by decoupling event consumption from external interactions.

    Architecture:

  • Consumer's Role: The Kafka consumer's only job is to process an incoming event and write a corresponding command or intent into an outbox table within its primary database. This write is part of the same transaction that updates the consumer's business state and idempotency key table.
  • The Outbox Table:
  • sql
        CREATE TABLE outbox (
            id UUID PRIMARY KEY,
            destination VARCHAR(255) NOT NULL, -- e.g., 'payment-gateway', 'email-service'
            payload JSONB NOT NULL, -- The data for the external system
            status VARCHAR(20) NOT NULL DEFAULT 'PENDING', -- PENDING, SENT, FAILED
            created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );
  • The Relay Process: A separate, independent process (the "relay") polls the outbox table. It reads PENDING records, attempts to deliver them to the external system, and updates their status to SENT or FAILED upon completion.
  • Implementation with Debezium and Kafka Connect:

    A powerful way to implement the relay is with Change Data Capture (CDC) using Debezium.

  • Debezium tails the database's transaction log and produces an event to a Kafka topic for every new row in the outbox table.
  • A simple Kafka consumer (or a dedicated Kafka Connect Sink) listens to this outbox topic and performs the API call.
  • This provides:

  • Durability: The intent to call the API is as durable as your primary database.
  • Decoupling: The main service consumer is no longer blocked on slow network calls. The relay can have its own retry logic, circuit breakers, and error handling.
  • Atomicity: The creation of the business state (e.g., an Order) and the outbox record are atomic, thanks to the database transaction.
  • java
    // Updated OrderProcessingService with Outbox Pattern
    @Service
    @Transactional
    public class OrderProcessingServiceWithOutbox {
    
        // ... repositories for processed_events, orders, and outbox
    
        public void handleOrderCreatedEvent(OrderCreatedEvent event, String idempotencyKey, String consumerGroupId) {
            if (processedEventRepository.existsById(idempotencyKey)) {
                return; // Standard idempotency check
            }
    
            // Business logic: create the order
            Order newOrder = createOrderFromEvent(event);
            orderRepository.save(newOrder);
    
            // Create the Outbox record for the side effect
            OutboxRecord outboxRecord = new OutboxRecord(
                "payment-gateway",
                buildPaymentPayload(newOrder)
            );
            outboxRepository.save(outboxRecord);
    
            // Record the event as processed
            processedEventRepository.save(new ProcessedEvent(idempotencyKey, consumerGroupId));
            
            // The Kafka offset is committed by the listener after this transaction succeeds.
            // The Debezium relay will now pick up the outboxRecord and call the payment gateway.
        }
    }

    This architecture ensures that the intent to perform a side effect is captured exactly once. The relay process must still handle idempotency with the target API (e.g., by passing a unique key in an Idempotency-Key header), but the critical link between Kafka consumption and the side effect is now robust and transactional.

    Conclusion

    Achieving true end-to-end exactly-once semantics in a Kafka-based system is a significant engineering challenge that extends far beyond a simple configuration setting. It requires a deliberate architectural approach centered on application-level idempotency.

    We have explored a progression of patterns, from simple but flawed caching to robust, transactionally consistent database techniques.

  • For services with moderate throughput and a transactional datastore, the Transactional Idempotency Pattern (Pattern 2) is the gold standard for reliability.
  • For systems demanding extreme write scalability, a Distributed Datastore like Cassandra (Pattern 3) is a powerful alternative, provided you carefully manage its unique failure modes.
  • For stateful services where event order is paramount, Aggregate Sequence Tracking is non-negotiable.
  • Finally, for managing interactions with external systems, the Transactional Outbox Pattern provides the ultimate level of decoupling and resilience.
  • By understanding and correctly implementing these patterns, you can build event-driven systems that are not just fast and scalable, but also correct and reliable, even in the face of the inevitable failures that define distributed computing.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles