CQRS/ES on Kafka Streams: Idempotent Projections for Out-of-Order Events

17 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Unspoken Assumption: Order in Event-Sourced Systems

In the canonical representation of Command Query Responsibility Segregation (CQRS) with Event Sourcing (ES), we envision a pristine, ordered log of domain events. Commands are validated, producing events that are immutably appended to an aggregate's stream. Projections, our read-side models, are built by consuming this stream in the exact order of occurrence. ItemAddedToCart is processed before CartCheckedOut; OrderCreated before OrderShipped.

This works beautifully on a whiteboard. In a distributed production environment using a broker like Apache Kafka, this assumption is not just fragile—it's demonstrably false. Sources of disorder are numerous:

* Producer Retries: A producer attempts to send Event_V2. The request times out, but the broker actually received and committed it. The producer's retry mechanism sends Event_V2 again. The consumer now sees a duplicate.

* Network Latency: Two producers on different nodes commit Event_V3 and Event_V4 for the same aggregate. Due to network conditions, the broker receives and logs Event_V4 before Event_V3.

* Broker Partition Leadership Changes: A partition leader fails, and a follower with a slight replication lag is elected as the new leader. Events that were acknowledged by the old leader but not yet replicated may be lost or appear later.

Idempotent Producer Settings (enable.idempotence=true): While this prevents duplicates from a single* producer session, it doesn't solve out-of-order issues across producer restarts or from multiple application instances producing events for the same aggregate.

When a naive projection processor encounters an out-of-order event, the result is state corruption. Imagine processing an OrderShipped event for an order that, according to the projection's current state, hasn't even been created yet. This leads to failed updates, inconsistent data, and critical business logic failures.

This article details a robust, production-ready pattern for building idempotent projections using Kafka Streams that are resilient to both duplicate and out-of-order events. We will leverage stateful stream processing and the Processor API to ensure correctness.

The Failure of Naive Projections

A common starting point is a simple Kafka consumer that reads from an event topic and applies changes directly to a database (e.g., PostgreSQL, Elasticsearch).

Let's model this with a simple Order aggregate. Events might be:

* OrderCreated { orderId, customerId, timestamp, version: 1 }

* ItemAdded { orderId, itemId, quantity, timestamp, version: 2 }

* OrderShipped { orderId, trackingNumber, timestamp, version: 3 }

Notice the explicit version field. This is critical. It's a monotonically increasing integer managed by the write-side (the aggregate root) for each specific orderId.

Our naive projection logic in a simple consumer might look like this (pseudo-code):

java
// Anti-pattern: Naive, stateless consumer
void processEvent(DomainEvent event) {
    OrderProjection projection = projectionRepository.findById(event.getOrderId());

    switch (event.getType()) {
        case "OrderCreated":
            // Create new projection
            projection = new OrderProjection(event.getOrderId(), ...);
            break;
        case "ItemAdded":
            // Assumes projection already exists!
            projection.addItem(event.getItemId(), ...);
            break;
        case "OrderShipped":
            // Assumes projection exists and is in a valid state
            projection.markAsShipped(event.getTrackingNumber());
            break;
    }
    projectionRepository.save(projection);
}

Now, consider this sequence of events arriving at the consumer for orderId: 123:

  • ItemAdded { version: 2 }
  • OrderShipped { version: 3 }
  • OrderCreated { version: 1 }
  • Processing ItemAdded {v:2}: The consumer tries projectionRepository.findById("123"). It returns null. The application might throw a NullPointerException or, worse, fail silently. The event goes to a dead-letter queue or causes the consumer to crash-loop.

    Processing OrderShipped {v:3}: Same failure mode. The order doesn't exist in the projection database.

    Processing OrderCreated {v:1}: This finally succeeds, creating the initial projection. But the previous events are already lost or stuck in a DLQ, leaving the projection in an incomplete state.

    This stateless, database-lookup-on-every-event pattern is not only fragile but also terribly inefficient due to high I/O latency against the external database.

    The Solution: Stateful Projections with Kafka Streams Processor API

    To solve this, we must bring state into our stream processor. Kafka Streams provides first-class support for stateful operations via State Stores, which are typically backed by RocksDB locally on the processor instance and backed up to a compacted Kafka topic for fault tolerance.

    Instead of relying on an external database for current state during processing, we will maintain the projection's state within the stream processor itself. The core principle is simple: for each incoming event, we compare its version with the version of the projection stored locally. This allows us to enforce order and guarantee idempotency.

    Our strategy will be:

  • Stream events from the source topic, keyed by the aggregate ID (orderId). This ensures all events for a given order are processed by the same task instance, guaranteeing ordering per partition.
  • Use the transform operation with a stateful Transformer implementation.
  • Within the Transformer, for each event:
  • a. Look up the current projection for the given orderId from a dedicated State Store.

    b. If the incoming event's version is exactly one greater than the stored projection's version, it's the expected next event. Apply it, update the projection in the state store, and forward the new projection state downstream.

    c. If the event's version is less than or equal to the stored version, it's a duplicate or an old, out-of-order event that has already been accounted for. We discard it silently. This is the key to idempotency.

    d. If the event's version is more than one greater than the stored version, it's an out-of-order event that has arrived too early. We can either buffer it (complex) or, more simply, discard it and wait for it to be re-processed after the missing events arrive. For this guide, we'll focus on the robust discard-and-rely-on-ordering model.

    This moves the consistency check from the external database into the stream processing layer, making it faster and more resilient.

    Production-Grade Implementation

    Let's build this out. We'll use Java and the Kafka Streams library.

    1. Project Setup (Maven Dependencies)

    xml
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.4.0</version>
        </dependency>
        <!-- For serialization/deserialization -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.14.2</version>
        </dependency>
        <!-- Add logging, etc. -->
    </dependencies>

    2. Domain Models and Serdes

    We need our event and projection classes. They must contain the orderId and version.

    java
    // Base class for events
    public abstract class OrderEvent {
        public String orderId;
        public long version;
        public Instant timestamp;
    }
    
    // Specific event types
    public class OrderCreatedEvent extends OrderEvent { /* ... fields ... */ }
    public class ItemAddedEvent extends OrderEvent { /* ... fields ... */ }
    public class OrderShippedEvent extends OrderEvent { /* ... fields ... */ }
    
    // The projection model we are building
    public class OrderProjection {
        public String orderId;
        public long version; // Track the version of the last applied event
        public String status;
        public Set<String> items;
        public Instant createdAt;
        public Instant shippedAt;
        // ... other fields
    }

    You'll also need to configure JSON Serdes (Serializer/Deserializer) for these classes to be used with Kafka Streams.

    3. The Stateful OrderProjectionTransformer

    This is the core of our solution. We implement the Transformer interface, which gives us access to a ProcessorContext and a key-value StateStore.

    java
    import org.apache.kafka.streams.kstream.Transformer;
    import org.apache.kafka.streams.processor.ProcessorContext;
    import org.apache.kafka.streams.state.KeyValueStore;
    
    public class OrderProjectionTransformer implements Transformer<String, OrderEvent, KeyValue<String, OrderProjection>> {
    
        private final String stateStoreName;
        private KeyValueStore<String, OrderProjection> stateStore;
        private ProcessorContext context;
    
        public OrderProjectionTransformer(String stateStoreName) {
            this.stateStoreName = stateStoreName;
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public void init(ProcessorContext context) {
            this.context = context;
            this.stateStore = (KeyValueStore<String, OrderProjection>) context.getStateStore(stateStoreName);
        }
    
        @Override
        public KeyValue<String, OrderProjection> transform(String orderId, OrderEvent event) {
            OrderProjection projection = stateStore.get(orderId);
    
            if (projection == null) {
                // This must be the first event for this order, and it MUST be an OrderCreatedEvent with version 1.
                if (event instanceof OrderCreatedEvent && event.version == 1) {
                    projection = apply((OrderCreatedEvent) event);
                } else {
                    // Out-of-order event for a non-existent aggregate. Discard and log.
                    // In a real system, you might send this to a DLQ for investigation.
                    System.err.printf("Discarding out-of-order event for non-existent order %s: %s (v%d)%n", 
                                      orderId, event.getClass().getSimpleName(), event.version);
                    return null; // Do not forward anything downstream
                }
            } else {
                // We have an existing projection. Check the version.
                if (event.version == projection.version + 1) {
                    // This is the expected next event. Apply it.
                    projection = apply(projection, event);
                } else {
                    // Duplicate or out-of-order. Discard and log.
                    System.err.printf("Discarding duplicate/out-of-order event for order %s. Current version: %d, Event: %s (v%d)%n",
                                      orderId, projection.version, event.getClass().getSimpleName(), event.version);
                    return null;
                }
            }
    
            // Save the updated projection back to the state store.
            stateStore.put(orderId, projection);
    
            // Forward the new projection state downstream.
            return KeyValue.pair(orderId, projection);
        }
    
        // --- Event Application Logic ---
        private OrderProjection apply(OrderCreatedEvent event) {
            OrderProjection p = new OrderProjection();
            p.orderId = event.orderId;
            p.version = event.version;
            p.status = "CREATED";
            p.createdAt = event.timestamp;
            p.items = new HashSet<>();
            return p;
        }
    
        private OrderProjection apply(OrderProjection current, OrderEvent event) {
            if (event instanceof ItemAddedEvent) {
                current.items.add(((ItemAddedEvent) event).itemId);
            } else if (event instanceof OrderShippedEvent) {
                current.status = "SHIPPED";
                current.shippedAt = event.timestamp;
            }
            // CRITICAL: Always update the version
            current.version = event.version;
            return current;
        }
    
        @Override
        public void close() {
            // No-op
        }
    }

    4. Building the Kafka Streams Topology

    Now, we wire everything together in our main application.

    java
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.KeyValueStore;
    import org.apache.kafka.common.serialization.Serdes;
    
    public class ProjectionApplication {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-projections-v1");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
            // Configure Serdes for keys and values
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
    
            final String stateStoreName = "order-projection-store";
            StreamsBuilder builder = new StreamsBuilder();
    
            // 1. Create the State Store builder
            StoreBuilder<KeyValueStore<String, OrderProjection>> storeBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(stateStoreName),
                Serdes.String(),
                new JsonSerde<>(OrderProjection.class)
            );
            builder.addStateStore(storeBuilder);
    
            // 2. Define the stream processing topology
            KStream<String, OrderProjection> projectionStream = builder
                .stream("order-events", Consumed.with(Serdes.String(), new JsonSerde<>(OrderEvent.class)))
                // The key of the event stream MUST be the aggregate ID
                .transform(() -> new OrderProjectionTransformer(stateStoreName), stateStoreName);
    
            // 3. Sink the results to a final projection topic (compacted)
            projectionStream.to("order-projections", Produced.with(Serdes.String(), new JsonSerde<>(OrderProjection.class)));
            
            // Optional: Also sink to an external DB from here if needed
            projectionStream.foreach((orderId, projection) -> {
                // externalDbClient.save(projection);
            });
    
            Topology topology = builder.build();
            System.out.println(topology.describe());
    
            KafkaStreams streams = new KafkaStreams(topology, props);
            streams.start();
    
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }
    }

    With this topology:

    * The order-events topic is consumed.

    * The transform step processes each event using our stateful OrderProjectionTransformer.

    * The transformer uses the order-projection-store to maintain the current state of each order's projection, enforcing versioning and order.

    * Only valid, updated projections are emitted downstream.

    * The resulting stream is sent to the order-projections topic. This topic should be configured with log.cleanup.policy=compact so it retains only the latest version of each projection, serving as a complete, queryable state of the system.

    Edge Cases and Advanced Considerations

    This pattern is robust, but production systems have more complexity.

    Handling Aggregate Deletion (Tombstones)

    What if an order is cancelled? We need a way to represent its deletion.

  • Introduce an OrderCancelledEvent. When this event is processed, the transformer should update the state store with a special marker or, even better, call stateStore.delete(orderId).
  • When forwarding downstream, the transformer should emit a tombstone record: KeyValue.pair(orderId, null).
  • Consumers of the final order-projections topic (and Kafka's log compaction process) will understand this null value as a deletion marker, removing the key from the state.
  • java
    // Inside OrderProjectionTransformer.transform()
    // ... after version checks
    if (event instanceof OrderCancelledEvent) {
        stateStore.delete(orderId);
        return KeyValue.pair(orderId, null); // Emit tombstone
    }
    //... rest of the logic

    State Store Performance and Sizing

    Your RocksDB state store lives on the local disk of your stream processing instances. Its size is a function of the number of unique aggregates and the size of each projection.

    * Memory: Allocate sufficient JVM heap and configure Kafka Streams' RocksDB memory settings (rocksdb.config.setter) to keep block caches in memory for fast lookups.

    * Disk: Use fast local storage (SSDs). Monitor disk usage closely.

    * Changelog Topics: The state store is backed by a compacted Kafka topic. During rebalancing or on instance restart, the processor restores its state from this topic. Ensure this topic has a high enough retention to allow for full state restoration.

    Reprocessing and Schema Evolution

    If you find a bug in your projection logic, you'll need to reprocess the entire event history. The standard way to do this is:

    • Stop the Kafka Streams application.
  • Use the kafka-streams-application-reset tool provided by Kafka to reset the application's offsets back to the beginning of the source topic.
    • Crucially, you must also wipe the application's local state directories on all instances. Otherwise, the processor will restart with its old, corrupted state.
    • Restart the application. It will now re-consume all events from the beginning and rebuild the projections correctly.

    Schema evolution of events can be tricky. Use a schema registry (like Confluent Schema Registry) and a format like Avro or Protobuf to manage backward and forward compatibility of your event schemas.

    Handling Very Late Events with Windowing

    The version-checking pattern handles most out-of-order cases but can be rigid. What if Event_V2 is delayed by minutes, while V3 and V4 have already been processed and discarded V2? V2 will be permanently discarded.

    For scenarios where you need to accommodate a certain amount of tardiness, you can combine this pattern with windowing, although it adds significant complexity. A SessionWindow can be used to group events by key that arrive within a certain gap of inactivity. You could aggregate events within a session window and then apply the result to the state store.

    However, for strict ordering based on aggregate versions, the stateful transformer pattern is often simpler and more correct. The business logic on the write-side should ensure versions are contiguous. If an event is truly lost, it represents a data loss problem that should be addressed at the source, not patched over in the read model.

    Conclusion

    Building resilient CQRS/ES projections in a distributed system requires acknowledging and planning for the inevitability of out-of-order and duplicate events. By moving away from naive stateless consumers and embracing stateful stream processing with Kafka Streams, we can build robust, idempotent, and high-performance read models.

    The key pattern—a stateful Transformer using a local KeyValueStore to validate event versions before application—shifts consistency enforcement into the stream processing layer. This approach not only prevents state corruption but also drastically reduces latency by avoiding round-trips to an external database for every event. Paired with proper handling of tombstones and a clear strategy for reprocessing, this architecture provides the foundation for a scalable and fault-tolerant read side that remains correct in the face of real-world distributed system chaos.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles