Kafka Streams EOS: Idempotency Patterns with the Processor API

18 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Gap Between Broker Guarantees and Application Reality

As senior engineers building distributed systems, we're well-acquainted with the promise of Kafka's Exactly-Once Semantics (EOS). By setting processing.guarantee=exactly_once_v2 in our Kafka Streams application, we enable atomic transactions that bundle read-process-write operations. This ensures that for any given input record, the resulting output records and state store updates are committed atomically with the consumer offsets. In the event of a failure and restart, the transaction is either fully completed or fully aborted, preventing the common failure modes of at-least-once (duplicate outputs) and at-most-once (lost outputs) processing within the Kafka ecosystem.

However, this powerful guarantee has a well-defined boundary: the Kafka cluster itself. The atomicity covers consuming from source topics, updating internal state stores, and producing to sink topics. The moment your stream processor needs to interact with the outside world—calling a REST API, writing to a relational database, sending a push notification—the EOS guarantee ends. A classic failure scenario illustrates this:

  • Your processor consumes message A.
    • It successfully calls an external payment gateway API.
    • Before the Kafka transaction can commit, the application instance crashes.

    Upon restart, the transaction is aborted. The consumer offset for message A is not committed. The new instance (or a standby) will re-process message A, see that it hasn't been processed according to its state, and call the payment gateway API a second time. This results in a double charge, a critical business failure that Kafka's EOS, by itself, cannot prevent.

    This is the crux of the problem: broker-level transactionality does not equal end-to-end application idempotency. To solve this, we must build idempotency into our application logic. While the Kafka Streams high-level DSL is excellent for stateless transformations and simple stateful aggregations, implementing robust, fine-grained idempotency checks often requires dropping down to the lower-level, but more powerful, Processor API.

    This article provides a production-focused guide on implementing such an idempotent processor, exploring the architectural patterns, edge cases, and performance considerations you'll face in a real-world system.

    Limitations of the DSL for Idempotent Side-Effects

    The high-level DSL provides a clean, functional API for stream processing. Consider a simple (and flawed) attempt to handle idempotency within the DSL:

    java
    // A naive and incorrect approach using the DSL
    KStream<String, Order> stream = builder.stream("orders-created");
    
    // State store for idempotency checks
    StoreBuilder<KeyValueStore<String, String>> idempotencyStoreBuilder = 
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("processed-orders-store"),
            Serdes.String(),
            Serdes.String()
        );
    builder.addStateStore(idempotencyStoreBuilder);
    
    stream.transform(
        () -> new Transformer<String, Order, KeyValue<String, ProcessedOrder>>() {
            private KeyValueStore<String, String> stateStore;
    
            @Override
            public void init(ProcessorContext context) {
                this.stateStore = (KeyValueStore<String, String>) context.getStateStore("processed-orders-store");
            }
    
            @Override
            public KeyValue<String, ProcessedOrder> transform(String key, Order value) {
                if (stateStore.get(value.getOrderId()) != null) {
                    // Already processed, skip
                    return null; 
                }
                
                // 1. Perform side-effect
                paymentGateway.charge(value);
                
                // 2. Update state
                stateStore.put(value.getOrderId(), "PROCESSED");
                
                // 3. Forward result
                return KeyValue.pair(key, new ProcessedOrder(value));
            }
    
            @Override
            public void close() {}
        },
        "processed-orders-store"
    ).to("orders-processed");

    While using a Transformer gets us closer by providing access to a state store, the fundamental problem remains. The operations inside transform are not atomic with respect to the external world. A crash between paymentGateway.charge() and stateStore.put() still leads to a double charge on retry. The Kafka transaction only guarantees the atomicity of the stateStore.put() and the send to the orders-processed topic. The DSL doesn't give us the fine-grained control needed to manage the interaction between transactional state and non-transactional side-effects gracefully.

    This is where the Processor API shines. It allows us to define the exact, low-level topology and control the flow of data and state interactions with precision.

    Core Pattern: The Idempotent Processor with a State Store

    Let's architect a robust solution using the Processor API. Our goal is to process payment orders, ensuring each order ID is processed exactly once, even in the face of crashes and retries.

    Architecture:

  • Input Topic: payment-orders
  • Output Topic: payments-processed
  • State Store: idempotency-key-store (a persistent KeyValueStore) to track the IDs of successfully processed orders.
  • Custom Processor: IdempotentPaymentProcessor that contains the core logic.
  • Step 1: Defining the Topology

    With the Processor API, we build the topology manually, connecting sources, processors, and sinks.

    java
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.state.StoreBuilder;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.KeyValueStore;
    
    // Assume Serdes for Order and ProcessedOrder are defined elsewhere
    
    public class IdempotentTopology {
    
        public static final String INPUT_TOPIC = "payment-orders";
        public static final String OUTPUT_TOPIC = "payments-processed";
        public static final String STATE_STORE_NAME = "idempotency-key-store";
    
        public Topology build() {
            Topology topology = new Topology();
    
            // 1. Define the state store
            StoreBuilder<KeyValueStore<String, String>> storeBuilder = 
                Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore(STATE_STORE_NAME),
                    Serdes.String(),
                    Serdes.String()
                ).withLoggingEnabled(new HashMap<>()); // Enable fault-tolerant changelog
    
            topology.addStateStore(storeBuilder);
    
            // 2. Define the source node
            topology.addSource("Source", Serdes.String().deserializer(), new OrderSerde().deserializer(), INPUT_TOPIC);
    
            // 3. Define the processor node and connect it to the state store
            topology.addProcessor(
                "IdempotentProcessor", 
                () -> new IdempotentPaymentProcessor(new PaymentGatewayClient()), 
                "Source"
            );
            topology.connectProcessorAndStateStores("IdempotentProcessor", STATE_STORE_NAME);
    
            // 4. Define the sink node
            topology.addSink(
                "Sink", 
                OUTPUT_TOPIC, 
                Serdes.String().serializer(), 
                new ProcessedOrderSerde().serializer(), 
                "IdempotentProcessor"
            );
    
            return topology;
        }
    }

    Step 2: Implementing the `IdempotentPaymentProcessor`

    This is where the core logic resides. The processor interacts with its context to access the state store and forward records.

    java
    import org.apache.kafka.streams.processor.api.Processor;
    import org.apache.kafka.streams.processor.api.ProcessorContext;
    import org.apache.kafka.streams.processor.api.Record;
    import org.apache.kafka.streams.state.KeyValueStore;
    
    public class IdempotentPaymentProcessor implements Processor<String, Order, String, ProcessedOrder> {
    
        private ProcessorContext<String, ProcessedOrder> context;
        private KeyValueStore<String, String> idempotencyStore;
        private final PaymentGatewayClient paymentGateway;
        private static final String PROCESSED_STATUS = "PROCESSED";
    
        public IdempotentPaymentProcessor(PaymentGatewayClient paymentGateway) {
            this.paymentGateway = paymentGateway;
        }
    
        @Override
        public void init(ProcessorContext<String, ProcessedOrder> context) {
            this.context = context;
            this.idempotencyStore = context.getStateStore(IdempotentTopology.STATE_STORE_NAME);
        }
    
        @Override
        public void process(Record<String, Order> record) {
            String orderId = record.value().getOrderId();
    
            // 1. Idempotency Check
            if (idempotencyStore.get(orderId) != null) {
                // Log as duplicate and drop the message. Do not throw an exception.
                // Acknowledging the message prevents endless retries for a known duplicate.
                System.out.println("Duplicate order detected, skipping: " + orderId);
                return;
            }
    
            // 2. Perform External Side-Effect
            // CRITICAL: The external system MUST be idempotent itself.
            // Most payment gateways support an idempotency key in their API.
            try {
                PaymentResult result = paymentGateway.charge(orderId, record.value().getAmount());
                if (!result.isSuccess()) {
                    // Handle payment failure - maybe forward to a dead-letter queue.
                    // For now, we'll just log and drop.
                    System.err.println("Payment failed for order: " + orderId);
                    return;
                }
                
                // 3. Update state and forward result within the same transaction
                // This is the atomic part guaranteed by Kafka Streams EOS.
                idempotencyStore.put(orderId, PROCESSED_STATUS);
                
                ProcessedOrder processedOrder = new ProcessedOrder(record.value(), result.getTransactionId());
                context.forward(new Record<>(record.key(), processedOrder, record.timestamp()));
    
            } catch (Exception e) {
                // If the gateway call fails with a transient error (e.g., network timeout),
                // throwing an exception will cause Kafka Streams to retry the whole process.
                // Because the idempotency key was not written to the state store,
                // the next attempt will correctly re-trigger the API call.
                System.err.println("Transient error processing order: " + orderId + ", will retry.");
                throw new RuntimeException("Failed to call payment gateway", e);
            }
        }
    
        @Override
        public void close() {
            // Cleanup resources if necessary
        }
    }

    How This Solves the Problem

    Let's revisit our crash scenario with this new implementation:

  • Processor consumes message A (order ID 123).
  • The idempotency check idempotencyStore.get("123") returns null.
  • The call to paymentGateway.charge("123", ...) succeeds.
  • The application crashes before idempotencyStore.put("123", ...) and context.forward(...) are committed.
  • On Restart:

  • The Kafka transaction is aborted. The state store is rolled back (it does not contain key 123), the output message is discarded, and the consumer offset is not advanced.
  • The processor consumes message A again.
  • The idempotency check idempotencyStore.get("123") again returns null.
  • The call to paymentGateway.charge("123", ...) is made a second time.
  • This seems like the same problem! However, the critical difference is our assumption: the external system must provide its own idempotency mechanism. A well-designed payment gateway API will accept an idempotency key (like the orderId). When it receives the second call with the same key, it won't process a new charge. Instead, it will recognize the duplicate request and return the result of the original, successful transaction. Our processor can then safely proceed, update its state store, and move on.

    Advanced Pattern: Handling Non-Idempotent External Systems

    What if you're forced to integrate with a legacy system or an API that doesn't support idempotency keys? This is a much harder problem. A direct call within the processor is no longer safe. The solution is to decouple the state transition from the non-idempotent side-effect using an outbox pattern.

    New Architecture:

  • Processor 1 (OrderValidatorProcessor): Consumes from payment-orders. It performs the idempotency check. Instead of calling the external API, it writes a PAYMENT_PENDING record to a new state store and forwards the order to an internal, intermediate topic called pending-api-calls. This entire operation is wrapped in a single Kafka transaction.
  • Processor 2 (ApiCallProcessor): Consumes from pending-api-calls. This processor's only job is to call the external API. It can be configured with processing.guarantee=at_least_once. Since its input is generated by a transactional, idempotent processor, we know we'll never get a duplicate PAYMENT_PENDING message for the same order. This processor can safely retry API calls on failure until it succeeds.
  • Implementation Sketch

    OrderValidatorProcessor:

    java
    // Inside OrderValidatorProcessor.process()
    @Override
    public void process(Record<String, Order> record) {
        String orderId = record.value().getOrderId();
    
        if (idempotencyStore.get(orderId) != null) {
            // Duplicate, skip
            return;
        }
    
        // Atomically update state and forward to the outbox topic
        idempotencyStore.put(orderId, "PENDING");
        context.forward(record);
    }

    Topology for this pattern:

    java
    // ... state store setup ...
    
    // Processor 1: Transactional validation and outbox
    topology.addSource("Source", ..., "payment-orders");
    topology.addProcessor("ValidatorProcessor", () -> new OrderValidatorProcessor(), "Source");
    topology.connectProcessorAndStateStores("ValidatorProcessor", STATE_STORE_NAME);
    topology.addSink("OutboxSink", "pending-api-calls", ..., "ValidatorProcessor");
    
    // Processor 2: At-least-once API caller (could be in a separate Streams app)
    topology.addSource("ApiCallSource", ..., "pending-api-calls");
    topology.addProcessor("ApiCallProcessor", () -> new ApiCallProcessor(), "ApiCallSource");
    // No sink needed if the final state is external

    This pattern ensures the critical state transition (order received -> payment pending) happens exactly once. The subsequent, non-idempotent action is handled by a separate, simpler component that can safely retry. This decouples transactional guarantees from external system limitations.

    Performance and Production Considerations

    Implementing stateful processors introduces performance overhead. Optimizing this is key for production readiness.

    State Store Tuning (RocksDB)

    Kafka Streams uses RocksDB as its default persistent state store. Its performance is highly tunable.

  • Memory and Caching: RocksDB performance is heavily dependent on its block cache. You can provide a custom RocksDBConfigSetter to tune it.
  • java
        // In your Streams properties
        props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class.getName());
        
        // Custom class
        public class CustomRocksDBConfig implements RocksDBConfigSetter {
            private static final long BLOCK_CACHE_SIZE_MB = 256 * 1024 * 1024; // 256 MB
            private static final long BLOCK_SIZE_KB = 16 * 1024; // 16 KB
        
            @Override
            public void setConfig(String storeName, Options options, Map<String, Object> configs) {
                BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
                
                // Set a shared block cache for all RocksDB instances on this host
                tableConfig.setBlockCache(new LRUCache(BLOCK_CACHE_SIZE_MB));
                tableConfig.setBlockSize(BLOCK_SIZE_KB);
                tableConfig.setCacheIndexAndFilterBlocks(true);
                options.setTableFormatConfig(tableConfig);
        
                // Optimize for write-heavy workloads
                options.setMaxWriteBufferNumber(4);
            }
        }
  • Changelog Topic Compaction: The state store is backed by a changelog topic in Kafka. To prevent this topic from growing indefinitely, it must be configured for log compaction. Kafka Streams does this by default when you enable logging (.withLoggingEnabled()), but it's crucial to verify the topic configuration (cleanup.policy=compact). This allows Kafka to discard older values for the same key, keeping only the latest state.
  • Fault Tolerance and Rebalancing

  • Standby Replicas: For stateful applications, rebalancing can be slow because the new instance must restore the state from the changelog topic. You can mitigate this by configuring standby replicas (num.standby.replicas=1 or more). A standby replica maintains a passive, up-to-date copy of the state. If the active instance fails, the standby can take over almost instantly, dramatically reducing downtime.
  • java
        // In your Streams properties
        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

    The trade-off is increased resource usage on the cluster, as each standby replica consumes disk space and network bandwidth to replicate the changelog.

  • Static Group Membership: To prevent unnecessary rebalances during short restarts or deployments, use static group membership. This gives each Streams instance a persistent group.instance.id so the coordinator can recognize it upon reconnection, avoiding a full group rebalance.
  • java
        // In your Streams properties
        props.put(StreamsConfig.consumerPrefix("group.instance.id"), "my-app-" + instanceIndex);

    Benchmarking Idempotency Overhead

    The idempotency check (idempotencyStore.get(orderId)) is not free. It's a key-value lookup that, in the worst case, hits disk. You must benchmark this overhead.

  • Measure Latency: Use ProcessorContext.currentStreamTimeMs() or System.currentTimeMillis() at the beginning and end of the process method to measure the processing latency per message.
  • Compare Baselines: Run your application with and without the idempotency check logic to quantify the P95/P99 latency impact.
  • Analyze Throughput: Monitor consumer lag and records-per-second metrics to ensure your application can keep up with the ingress rate.
  • For many applications, the latency of a local RocksDB lookup (microseconds to a few milliseconds) is negligible compared to the network call to the external service. However, for high-throughput, low-latency streams, this overhead can be significant and must be accounted for.

    Conclusion

    Kafka's Exactly-Once Semantics provide a robust foundation for building reliable streaming applications, but they are not a silver bullet. True end-to-end processing guarantees require a deliberate, application-level approach to idempotency, especially when interacting with external systems.

    The high-level DSL, while convenient, often lacks the fine-grained control needed for complex, stateful operations involving side-effects. By leveraging the Processor API, we gain direct access to state stores and the processing lifecycle, allowing us to implement powerful patterns like the idempotent processor.

    As a senior engineer, your key takeaways should be:

  • Identify the Transactional Boundary: Understand precisely what Kafka's EOS does and does not cover. Your responsibility begins where Kafka's guarantees end.
  • Design for External Idempotency: The most robust solutions rely on external systems that provide their own idempotency mechanisms. When this isn't possible, use patterns like the transactional outbox to decouple state changes from non-idempotent actions.
  • Master the Processor API: For any non-trivial stateful processing, the Processor API is an indispensable tool. It provides the power and flexibility required for production-grade stream processing.
  • Tune for Performance: Stateful processing is resource-intensive. Proactively tune your RocksDB state stores, configure fault tolerance with standby replicas, and benchmark the performance impact of your logic to ensure your application is scalable and resilient.
  • Found this article helpful?

    Share it with others who might benefit from it.

    More Articles