Advanced CQRS: Idempotency & Out-of-Order Events in Kafka Streams

20 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Unspoken Contract of Event-Sourced Systems

In a perfect world, event-sourced CQRS architectures are elegant. Commands generate events, which are immutably stored and sequentially processed to build read-side projections. The reality, however, is governed by the fallacies of distributed computing. Network partitions, client-side retries, and variable producer latencies mean our processors are constantly under assault from two insidious problems: duplicate events and out-of-order events.

Ignoring these issues leads to catastrophic data corruption. A duplicate OrderPlaced event creates a second, phantom order. An OrderUpdated event arriving before its corresponding OrderCreated event is either dropped—losing critical data—or it corrupts the state, leading to an inconsistent and untrustworthy read model.

This article is not an introduction to Kafka Streams or CQRS. It assumes you are already building these systems and have encountered the limitations of stateless stream processing. We will architect and implement a stateful, resilient projection processor using the low-level Kafka Streams Processor API to solve these challenges head-on, providing patterns you can deploy in production.

Our focus will be on building a read-model projection for an Order aggregate. We'll handle OrderCreated, OrderUpdated, and OrderShipped events, ensuring our final projection is correct regardless of delivery guarantees or timing anomalies.

The Core Problem Scenarios: A Closer Look

Let's define our event structure and the specific failure modes we will solve.

Events:

java
// Simplified event structures for clarity
record EventMetadata(String eventId, String traceId, long timestamp) {}
record OrderCreated(String orderId, String customerId, long createdAt) {}
record OrderUpdated(String orderId, List<LineItem> items, long updatedAt) {}
record OrderShipped(String orderId, String trackingNumber, long shippedAt) {}

// The message on the Kafka topic
record OrderEvent(EventMetadata metadata, Object payload) {}

1. The Idempotency Challenge:

A mobile client sends a CreateOrder command. The backend processes it, publishes an OrderCreated event, but the HTTP response to the client times out. The client, following best practices, retries the command with the same idempotency key. The backend, correctly, does not create a new order but re-publishes the original OrderCreated event to ensure downstream consumers are reconciled. Our projection processor now sees the exact same event twice. A naive processor would create two order projections.

2. The Out-of-Order Challenge:

An OrderCreated event is produced from service A. Moments later, an OrderUpdated event for the same orderId is produced from service B. Due to network latency or a Kafka producer batching delay, the OrderUpdated event lands in the topic partition before the OrderCreated event. When our processor consumes OrderUpdated, there is no existing order state to update. The simplest approach is to drop it, but this constitutes data loss. The correct approach is to buffer the update until the prerequisite OrderCreated event arrives.

Architectural Solution: Stateful Processing with the Processor API

The standard Kafka Streams DSL (KStream.filter(), .map(), etc.) is excellent for stateless transformations but lacks the fine-grained control over state and time needed to solve our problem. We must descend to the Processor API.

Our solution will be a single Processor implementation that utilizes two distinct state stores:

  • deduplication-store: A KeyValueStore to track processed eventIds. The key will be the unique event ID, and the value will be the event timestamp. This allows us to discard duplicates and manage a TTL on stored IDs.
  • out-of-order-buffer-store: A KeyValueStore> to temporarily hold events that have arrived out of order. The key is the aggregate ID (orderId), and the value is a list of buffered events for that aggregate.
  • order-projection-store: The primary KeyValueStore that holds our materialized view.
  • This processor will be scheduled with a Punctuator. This is a callback mechanism that Kafka Streams can invoke based on wall-clock time (PunctuationType.WALL_CLOCK_TIME) or stream-time (PunctuationType.STREAM_TIME). We will use it to periodically scan our buffer and deduplication stores to evict old entries, preventing unbounded state growth.

    Let's start building.

    Step 1: Implementing the Idempotent Processor

    First, we'll tackle the deduplication logic. The core pattern is simple: check for existence, and if not present, process and record.

    java
    import org.apache.kafka.streams.processor.api.*;
    import org.apache.kafka.streams.state.KeyValueStore;
    
    public class OrderProcessor implements Processor<String, OrderEvent, Void, Void> {
    
        private ProcessorContext<Void, Void> context;
        private KeyValueStore<String, OrderProjection> projectionStore;
        private KeyValueStore<String, Long> deduplicationStore;
    
        private static final String PROJECTION_STORE_NAME = "order-projection-store";
        private static final String DEDUPLICATION_STORE_NAME = "deduplication-store";
    
        @Override
        public void init(ProcessorContext<Void, Void> context) {
            this.context = context;
            this.projectionStore = context.getStateStore(PROJECTION_STORE_NAME);
            this.deduplicationStore = context.getStateStore(DEDUPLICATION_STORE_NAME);
        }
    
        @Override
        public void process(Record<String, OrderEvent> record) {
            OrderEvent event = record.value();
            String eventId = event.metadata().eventId();
    
            // 1. Idempotency Check
            if (deduplicationStore.get(eventId) != null) {
                // Log duplicate event, maybe emit a metric
                System.out.println("Duplicate event received and skipped: " + eventId);
                return;
            }
    
            // 2. Process the event (logic will be expanded later)
            applyEvent(event);
    
            // 3. Mark event as processed
            // We store the timestamp for potential TTL logic later
            deduplicationStore.put(eventId, event.metadata().timestamp());
        }
    
        private void applyEvent(OrderEvent event) {
            // Simplified apply logic for now
            if (event.payload() instanceof OrderCreated created) {
                OrderProjection projection = new OrderProjection(created.orderId(), created.customerId(), "CREATED");
                projectionStore.put(created.orderId(), projection);
                System.out.println("Created projection for order: " + created.orderId());
            } else if (event.payload() instanceof OrderUpdated updated) {
                OrderProjection projection = projectionStore.get(updated.orderId());
                if (projection != null) {
                    projection.setStatus("UPDATED");
                    projectionStore.put(updated.orderId(), projection);
                    System.out.println("Updated projection for order: " + updated.orderId());
                }
            }
            // ... other event types
        }
    
        @Override
        public void close() {}
    }

    This code forms the basis of our idempotency control. The atomicity of this operation is critical. When running with Exactly-Once Semantics (EOS) enabled (processing.guarantee=exactly_once_v2), the writes to the state stores (projectionStore and deduplicationStore) and the commit of the consumer offset are performed as a single transaction. If the application crashes after applyEvent but before deduplicationStore.put, the entire transaction will be rolled back upon restart, and the message will be reprocessed correctly.

    Step 2: Handling Out-of-Order Events with a Buffer

    Now we introduce the more complex logic. When an event arrives for an aggregate that doesn't exist yet, we must buffer it.

    We'll add the out-of-order-buffer-store and modify our applyEvent logic significantly.

    java
    // Add this to the OrderProcessor class
    private KeyValueStore<String, List<OrderEvent>> bufferStore;
    private static final String BUFFER_STORE_NAME = "out-of-order-buffer-store";
    
    // In init() method:
    this.bufferStore = context.getStateStore(BUFFER_STORE_NAME);
    
    // The new, more robust applyEvent method
    private void applyEvent(OrderEvent event) {
        String orderId = getOrderIdFromEvent(event.payload());
    
        if (event.payload() instanceof OrderCreated created) {
            // This is the anchor event, it can never be "out of order"
            OrderProjection projection = new OrderProjection(created.orderId(), created.customerId(), "CREATED");
            projectionStore.put(orderId, projection);
            System.out.println("Created projection for order: " + orderId);
    
            // CRITICAL: After creating the projection, check the buffer for subsequent events
            processBufferedEvents(orderId);
    
        } else {
            // This is a subsequent event (Update, Shipped, etc.)
            OrderProjection projection = projectionStore.get(orderId);
    
            if (projection != null) {
                // Happy path: The aggregate exists, we can apply the event directly
                applyUpdate(projection, event);
                System.out.println("Applied in-order event " + event.metadata().eventId() + " to order: " + orderId);
            } else {
                // Out-of-order path: The aggregate does not exist yet. Buffer the event.
                bufferEvent(orderId, event);
                System.out.println("Buffered out-of-order event " + event.metadata().eventId() + " for order: " + orderId);
            }
        }
    }
    
    private void bufferEvent(String orderId, OrderEvent event) {
        List<OrderEvent> buffered = bufferStore.get(orderId);
        if (buffered == null) {
            buffered = new ArrayList<>();
        }
        buffered.add(event);
        // Sort by event timestamp to maintain some order within the buffer
        buffered.sort(Comparator.comparing(e -> e.metadata().timestamp()));
        bufferStore.put(orderId, buffered);
    }
    
    private void processBufferedEvents(String orderId) {
        List<OrderEvent> buffered = bufferStore.get(orderId);
        if (buffered != null && !buffered.isEmpty()) {
            System.out.println("Found " + buffered.size() + " buffered events for order: " + orderId);
            OrderProjection projection = projectionStore.get(orderId);
            for (OrderEvent bufferedEvent : buffered) {
                applyUpdate(projection, bufferedEvent);
            }
            // Clear the buffer for this key
            bufferStore.delete(orderId);
        }
    }
    
    // Helper to apply non-creation events
    private void applyUpdate(OrderProjection projection, OrderEvent event) {
        if (event.payload() instanceof OrderUpdated) {
            projection.setStatus("UPDATED");
        } else if (event.payload() instanceof OrderShipped) {
            projection.setStatus("SHIPPED");
        }
        projectionStore.put(projection.getOrderId(), projection);
    }
    
    // Helper to extract orderId
    private String getOrderIdFromEvent(Object payload) {
        if (payload instanceof OrderCreated e) return e.orderId();
        if (payload instanceof OrderUpdated e) return e.orderId();
        if (payload instanceof OrderShipped e) return e.orderId();
        throw new IllegalArgumentException("Unknown event type");
    }
    

    With this logic, our processor is now robust to out-of-order events. An OrderUpdated arriving first will be safely stashed in the bufferStore. Once the OrderCreated event finally arrives, it creates the base projection and then immediately calls processBufferedEvents, which finds the stashed OrderUpdated event, applies it, and brings the projection to the correct, final state.

    Step 3: Taming State Growth with Punctuators and Time

    Our current implementation has a fatal flaw: the deduplicationStore and bufferStore will grow indefinitely. We must implement a TTL (Time-To-Live) mechanism to evict old entries.

    This is where Punctuator and event-time processing become critical.

    First, we need to configure our topology to use event-time, not processing-time. This means Kafka Streams will base its notion of time on a timestamp embedded in the records themselves, which is essential for correctly handling late-arriving data.

    Custom Timestamp Extractor:

    java
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.streams.processor.TimestampExtractor;
    
    public class EventTimestampExtractor implements TimestampExtractor {
        @Override
        public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
            OrderEvent event = (OrderEvent) record.value();
            if (event != null && event.metadata() != null) {
                return event.metadata().timestamp();
            }
            // Fallback to wall-clock time if event is malformed
            return System.currentTimeMillis();
        }
    }

    Now, let's implement the Punctuator to clean up our state stores.

    java
    // In OrderProcessor.init()
    
    // Schedule a punctuator to run every 1 minute of stream-time.
    // Stream-time advances based on the timestamps extracted from processed records.
    this.context.schedule(
        Duration.ofMinutes(1),
        PunctuationType.STREAM_TIME,
        (timestamp) -> {
            // TTL thresholds
            long deduplicationTtl = Duration.ofDays(7).toMillis();
            long bufferTtl = Duration.ofHours(24).toMillis();
    
            // Clean up deduplication store
            try (var iterator = deduplicationStore.all()) {
                while (iterator.hasNext()) {
                    KeyValue<String, Long> entry = iterator.next();
                    if ((timestamp - entry.value) > deduplicationTTtl) {
                        deduplicationStore.delete(entry.key);
                    }
                }
            }
    
            // Clean up buffer store and move expired events to a DLQ
            try (var iterator = bufferStore.all()) {
                while (iterator.hasNext()) {
                    KeyValue<String, List<OrderEvent>> entry = iterator.next();
                    // We check the timestamp of the *latest* event in the buffer for a given key
                    long latestEventTime = entry.value.stream()
                        .mapToLong(e -> e.metadata().timestamp())
                        .max().orElse(0L);
    
                    if (latestEventTime > 0 && (timestamp - latestEventTime) > bufferTtl) {
                        // These events are considered permanently lost.
                        // Forward them to a Dead-Letter Queue (DLQ) for manual inspection.
                        for (OrderEvent expiredEvent : entry.value) {
                            context.forward(
                                new Record<>(entry.getKey(), expiredEvent, timestamp),
                                "dlq-sink-topic"
                            );
                        }
                        bufferStore.delete(entry.key);
                    }
                }
            }
        }
    );

    This Punctuator is our garbage collector. It's scheduled based on STREAM_TIME, which is crucial. STREAM_TIME is the maximum event timestamp seen by the stream task so far. This means our TTL logic is based on the data's timeline, not the server's wall clock, making it resilient to processing lags.

  • Deduplication Cleanup: We iterate through the deduplicationStore and remove any eventIds that are older than our configured TTL (e.g., 7 days). This window should be larger than any reasonable client retry window.
  • Buffer Cleanup: This is more critical. If an event has been in the buffer for too long (e.g., 24 hours), it's highly likely its prerequisite OrderCreated event was never produced or was lost. We can't keep it forever. The correct pattern is to forward these expired events to a Dead-Letter Topic (DLT, sometimes called a DLQ). An operational team can then inspect the DLT to diagnose the root cause of the data loss.
  • To forward to a DLT, we need to declare a named sink in our topology, which we'll see in the final assembly.

    Step 4: Assembling the Full Topology

    Now we tie everything together into a complete Kafka Streams Topology.

    java
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.KeyValueStore;
    import org.apache.kafka.streams.state.StoreBuilder;
    
    public class OrderTopologyBuilder {
    
        public static Topology build() {
            StreamsBuilder builder = new StreamsBuilder();
    
            // State Store Definitions with changelogging enabled for fault tolerance
            StoreBuilder<KeyValueStore<String, OrderProjection>> projectionStoreBuilder = 
                Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("order-projection-store"),
                    Serdes.String(),
                    new JsonSerde<>(OrderProjection.class));
    
            StoreBuilder<KeyValueStore<String, Long>> deduplicationStoreBuilder = 
                Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("deduplication-store"),
                    Serdes.String(),
                    Serdes.Long());
    
            StoreBuilder<KeyValueStore<String, List<OrderEvent>>> bufferStoreBuilder = 
                Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("out-of-order-buffer-store"),
                    Serdes.String(),
                    new JsonSerde<>(List.class));
    
            builder.addStateStore(projectionStoreBuilder);
            builder.addStateStore(deduplicationStoreBuilder);
            builder.addStateStore(bufferStoreBuilder);
    
            builder.stream("order-events-topic", Consumed.with(Serdes.String(), new JsonSerde<>(OrderEvent.class)))
                .process(() -> new OrderProcessor(), 
                    "order-projection-store", 
                    "deduplication-store", 
                    "out-of-order-buffer-store")
                .addSink("dlq-sink-topic", "order-events-dlq-topic", Serdes.String().serializer(), new JsonSerde<OrderEvent>().serializer());
    
            return builder.build();
        }
    }
    
    // Main application entry point
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-projection-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimestampExtractor.class.getName());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_v2");
        // RocksDB tuning for production would go here
    
        Topology topology = OrderTopologyBuilder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
    
        // ... standard shutdown hook and streams.start() logic
        streams.start();
    }

    Key points in the topology construction:

  • State Stores: We explicitly define our three state stores. persistentKeyValueStore ensures they are backed by RocksDB and a changelog topic, making them fault-tolerant. If a pod crashes, Kafka Streams will restore the state from the changelog topic on another instance.
  • process(): We connect our input topic (order-events-topic) to our custom OrderProcessor and explicitly name the state stores it needs to access.
  • addSink(): We define a named sink dlq-sink-topic that our processor can use via context.forward(). This cleanly separates our main processing logic from our error-handling path.
  • Configuration: We set the DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG to use our custom extractor and, crucially, enable exactly_once_v2 for transactional guarantees.
  • Performance and Production Considerations

    This pattern is robust, but it is not without its costs. Senior engineers must consider the following trade-offs:

    * State Size & Rebalancing: The state stores live on the local disk of your processor instances. Large state can significantly increase the time it takes for a consumer to rebalance, as the new consumer must restore the state from the changelog topic. Monitor your state store sizes closely. Consider capacity planning for disk space and network I/O during rebalances.

    * RocksDB Tuning: The default RocksDB configuration is not optimized for all workloads. For write-heavy applications like this, tuning write_buffer_size, max_write_buffer_number, and memory for block caches can yield significant performance improvements. This is a deep topic in itself, but essential for production stability.

    * Latency Impact: The buffering mechanism inherently adds latency. An out-of-order event is not processed until its prerequisite arrives. Your buffer TTL (e.g., 24 hours) defines the maximum latency you are willing to tolerate for an event to be processed. This must align with business SLAs.

    * Key Distribution: The effectiveness of Kafka's parallelism depends on good key distribution. If all events for a short period have the same orderId, one task will be a hot spot while others are idle. This pattern does not solve key skew; it only ensures correctness within a given partition.

    * Observability: Instrument your processor heavily. Key metrics to monitor include:

    * duplicates.skipped.total: Rate of duplicate events being discarded.

    * events.buffered.total: Rate of events being sent to the out-of-order buffer.

    * buffer.size.gauge: The current number of events in the buffer store (per instance).

    * events.expired.total: Rate of events being sent to the DLT.

    * end-to-end.latency.histogram: The time from event production to final projection state update.

    Conclusion: Embracing Stateful Complexity for Correctness

    Building resilient distributed systems requires moving beyond the idealized 'happy path'. By embracing stateful stream processing and confronting the challenges of idempotency and event ordering directly, we can build CQRS projections that are not just fast, but fundamentally correct and trustworthy.

    The pattern presented here—combining the Processor API, multiple state stores, and time-based punctuators—is a powerful template. It trades the simplicity of stateless processing for the robustness required in production environments. While complex, this approach ensures that your read models accurately reflect the reality of your event log, even when the network and distributed producers conspire against you. It is this level of defensive design that separates short-lived prototypes from durable, enterprise-grade systems.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles