Kafka Streams EOS: Idempotency Patterns with the Processor API
The Gap Between Broker Guarantees and Application Reality
As senior engineers building distributed systems, we're well-acquainted with the promise of Kafka's Exactly-Once Semantics (EOS). By setting processing.guarantee=exactly_once_v2 in our Kafka Streams application, we enable atomic transactions that bundle read-process-write operations. This ensures that for any given input record, the resulting output records and state store updates are committed atomically with the consumer offsets. In the event of a failure and restart, the transaction is either fully completed or fully aborted, preventing the common failure modes of at-least-once (duplicate outputs) and at-most-once (lost outputs) processing within the Kafka ecosystem.
However, this powerful guarantee has a well-defined boundary: the Kafka cluster itself. The atomicity covers consuming from source topics, updating internal state stores, and producing to sink topics. The moment your stream processor needs to interact with the outside world—calling a REST API, writing to a relational database, sending a push notification—the EOS guarantee ends. A classic failure scenario illustrates this:
A.- It successfully calls an external payment gateway API.
- Before the Kafka transaction can commit, the application instance crashes.
Upon restart, the transaction is aborted. The consumer offset for message A is not committed. The new instance (or a standby) will re-process message A, see that it hasn't been processed according to its state, and call the payment gateway API a second time. This results in a double charge, a critical business failure that Kafka's EOS, by itself, cannot prevent.
This is the crux of the problem: broker-level transactionality does not equal end-to-end application idempotency. To solve this, we must build idempotency into our application logic. While the Kafka Streams high-level DSL is excellent for stateless transformations and simple stateful aggregations, implementing robust, fine-grained idempotency checks often requires dropping down to the lower-level, but more powerful, Processor API.
This article provides a production-focused guide on implementing such an idempotent processor, exploring the architectural patterns, edge cases, and performance considerations you'll face in a real-world system.
Limitations of the DSL for Idempotent Side-Effects
The high-level DSL provides a clean, functional API for stream processing. Consider a simple (and flawed) attempt to handle idempotency within the DSL:
// A naive and incorrect approach using the DSL
KStream<String, Order> stream = builder.stream("orders-created");
// State store for idempotency checks
StoreBuilder<KeyValueStore<String, String>> idempotencyStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("processed-orders-store"),
Serdes.String(),
Serdes.String()
);
builder.addStateStore(idempotencyStoreBuilder);
stream.transform(
() -> new Transformer<String, Order, KeyValue<String, ProcessedOrder>>() {
private KeyValueStore<String, String> stateStore;
@Override
public void init(ProcessorContext context) {
this.stateStore = (KeyValueStore<String, String>) context.getStateStore("processed-orders-store");
}
@Override
public KeyValue<String, ProcessedOrder> transform(String key, Order value) {
if (stateStore.get(value.getOrderId()) != null) {
// Already processed, skip
return null;
}
// 1. Perform side-effect
paymentGateway.charge(value);
// 2. Update state
stateStore.put(value.getOrderId(), "PROCESSED");
// 3. Forward result
return KeyValue.pair(key, new ProcessedOrder(value));
}
@Override
public void close() {}
},
"processed-orders-store"
).to("orders-processed");
While using a Transformer gets us closer by providing access to a state store, the fundamental problem remains. The operations inside transform are not atomic with respect to the external world. A crash between paymentGateway.charge() and stateStore.put() still leads to a double charge on retry. The Kafka transaction only guarantees the atomicity of the stateStore.put() and the send to the orders-processed topic. The DSL doesn't give us the fine-grained control needed to manage the interaction between transactional state and non-transactional side-effects gracefully.
This is where the Processor API shines. It allows us to define the exact, low-level topology and control the flow of data and state interactions with precision.
Core Pattern: The Idempotent Processor with a State Store
Let's architect a robust solution using the Processor API. Our goal is to process payment orders, ensuring each order ID is processed exactly once, even in the face of crashes and retries.
Architecture:
payment-orderspayments-processedidempotency-key-store (a persistent KeyValueStore) to track the IDs of successfully processed orders.IdempotentPaymentProcessor that contains the core logic.Step 1: Defining the Topology
With the Processor API, we build the topology manually, connecting sources, processors, and sinks.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.KeyValueStore;
// Assume Serdes for Order and ProcessedOrder are defined elsewhere
public class IdempotentTopology {
public static final String INPUT_TOPIC = "payment-orders";
public static final String OUTPUT_TOPIC = "payments-processed";
public static final String STATE_STORE_NAME = "idempotency-key-store";
public Topology build() {
Topology topology = new Topology();
// 1. Define the state store
StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STATE_STORE_NAME),
Serdes.String(),
Serdes.String()
).withLoggingEnabled(new HashMap<>()); // Enable fault-tolerant changelog
topology.addStateStore(storeBuilder);
// 2. Define the source node
topology.addSource("Source", Serdes.String().deserializer(), new OrderSerde().deserializer(), INPUT_TOPIC);
// 3. Define the processor node and connect it to the state store
topology.addProcessor(
"IdempotentProcessor",
() -> new IdempotentPaymentProcessor(new PaymentGatewayClient()),
"Source"
);
topology.connectProcessorAndStateStores("IdempotentProcessor", STATE_STORE_NAME);
// 4. Define the sink node
topology.addSink(
"Sink",
OUTPUT_TOPIC,
Serdes.String().serializer(),
new ProcessedOrderSerde().serializer(),
"IdempotentProcessor"
);
return topology;
}
}
Step 2: Implementing the `IdempotentPaymentProcessor`
This is where the core logic resides. The processor interacts with its context to access the state store and forward records.
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
public class IdempotentPaymentProcessor implements Processor<String, Order, String, ProcessedOrder> {
private ProcessorContext<String, ProcessedOrder> context;
private KeyValueStore<String, String> idempotencyStore;
private final PaymentGatewayClient paymentGateway;
private static final String PROCESSED_STATUS = "PROCESSED";
public IdempotentPaymentProcessor(PaymentGatewayClient paymentGateway) {
this.paymentGateway = paymentGateway;
}
@Override
public void init(ProcessorContext<String, ProcessedOrder> context) {
this.context = context;
this.idempotencyStore = context.getStateStore(IdempotentTopology.STATE_STORE_NAME);
}
@Override
public void process(Record<String, Order> record) {
String orderId = record.value().getOrderId();
// 1. Idempotency Check
if (idempotencyStore.get(orderId) != null) {
// Log as duplicate and drop the message. Do not throw an exception.
// Acknowledging the message prevents endless retries for a known duplicate.
System.out.println("Duplicate order detected, skipping: " + orderId);
return;
}
// 2. Perform External Side-Effect
// CRITICAL: The external system MUST be idempotent itself.
// Most payment gateways support an idempotency key in their API.
try {
PaymentResult result = paymentGateway.charge(orderId, record.value().getAmount());
if (!result.isSuccess()) {
// Handle payment failure - maybe forward to a dead-letter queue.
// For now, we'll just log and drop.
System.err.println("Payment failed for order: " + orderId);
return;
}
// 3. Update state and forward result within the same transaction
// This is the atomic part guaranteed by Kafka Streams EOS.
idempotencyStore.put(orderId, PROCESSED_STATUS);
ProcessedOrder processedOrder = new ProcessedOrder(record.value(), result.getTransactionId());
context.forward(new Record<>(record.key(), processedOrder, record.timestamp()));
} catch (Exception e) {
// If the gateway call fails with a transient error (e.g., network timeout),
// throwing an exception will cause Kafka Streams to retry the whole process.
// Because the idempotency key was not written to the state store,
// the next attempt will correctly re-trigger the API call.
System.err.println("Transient error processing order: " + orderId + ", will retry.");
throw new RuntimeException("Failed to call payment gateway", e);
}
}
@Override
public void close() {
// Cleanup resources if necessary
}
}
How This Solves the Problem
Let's revisit our crash scenario with this new implementation:
A (order ID 123).idempotencyStore.get("123") returns null.paymentGateway.charge("123", ...) succeeds.idempotencyStore.put("123", ...) and context.forward(...) are committed.On Restart:
123), the output message is discarded, and the consumer offset is not advanced.A again.idempotencyStore.get("123") again returns null.paymentGateway.charge("123", ...) is made a second time.This seems like the same problem! However, the critical difference is our assumption: the external system must provide its own idempotency mechanism. A well-designed payment gateway API will accept an idempotency key (like the orderId). When it receives the second call with the same key, it won't process a new charge. Instead, it will recognize the duplicate request and return the result of the original, successful transaction. Our processor can then safely proceed, update its state store, and move on.
Advanced Pattern: Handling Non-Idempotent External Systems
What if you're forced to integrate with a legacy system or an API that doesn't support idempotency keys? This is a much harder problem. A direct call within the processor is no longer safe. The solution is to decouple the state transition from the non-idempotent side-effect using an outbox pattern.
New Architecture:
OrderValidatorProcessor): Consumes from payment-orders. It performs the idempotency check. Instead of calling the external API, it writes a PAYMENT_PENDING record to a new state store and forwards the order to an internal, intermediate topic called pending-api-calls. This entire operation is wrapped in a single Kafka transaction.ApiCallProcessor): Consumes from pending-api-calls. This processor's only job is to call the external API. It can be configured with processing.guarantee=at_least_once. Since its input is generated by a transactional, idempotent processor, we know we'll never get a duplicate PAYMENT_PENDING message for the same order. This processor can safely retry API calls on failure until it succeeds.Implementation Sketch
OrderValidatorProcessor:
// Inside OrderValidatorProcessor.process()
@Override
public void process(Record<String, Order> record) {
String orderId = record.value().getOrderId();
if (idempotencyStore.get(orderId) != null) {
// Duplicate, skip
return;
}
// Atomically update state and forward to the outbox topic
idempotencyStore.put(orderId, "PENDING");
context.forward(record);
}
Topology for this pattern:
// ... state store setup ...
// Processor 1: Transactional validation and outbox
topology.addSource("Source", ..., "payment-orders");
topology.addProcessor("ValidatorProcessor", () -> new OrderValidatorProcessor(), "Source");
topology.connectProcessorAndStateStores("ValidatorProcessor", STATE_STORE_NAME);
topology.addSink("OutboxSink", "pending-api-calls", ..., "ValidatorProcessor");
// Processor 2: At-least-once API caller (could be in a separate Streams app)
topology.addSource("ApiCallSource", ..., "pending-api-calls");
topology.addProcessor("ApiCallProcessor", () -> new ApiCallProcessor(), "ApiCallSource");
// No sink needed if the final state is external
This pattern ensures the critical state transition (order received -> payment pending) happens exactly once. The subsequent, non-idempotent action is handled by a separate, simpler component that can safely retry. This decouples transactional guarantees from external system limitations.
Performance and Production Considerations
Implementing stateful processors introduces performance overhead. Optimizing this is key for production readiness.
State Store Tuning (RocksDB)
Kafka Streams uses RocksDB as its default persistent state store. Its performance is highly tunable.
RocksDBConfigSetter to tune it. // In your Streams properties
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class.getName());
// Custom class
public class CustomRocksDBConfig implements RocksDBConfigSetter {
private static final long BLOCK_CACHE_SIZE_MB = 256 * 1024 * 1024; // 256 MB
private static final long BLOCK_SIZE_KB = 16 * 1024; // 16 KB
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
// Set a shared block cache for all RocksDB instances on this host
tableConfig.setBlockCache(new LRUCache(BLOCK_CACHE_SIZE_MB));
tableConfig.setBlockSize(BLOCK_SIZE_KB);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
// Optimize for write-heavy workloads
options.setMaxWriteBufferNumber(4);
}
}
.withLoggingEnabled()), but it's crucial to verify the topic configuration (cleanup.policy=compact). This allows Kafka to discard older values for the same key, keeping only the latest state.Fault Tolerance and Rebalancing
num.standby.replicas=1 or more). A standby replica maintains a passive, up-to-date copy of the state. If the active instance fails, the standby can take over almost instantly, dramatically reducing downtime. // In your Streams properties
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
The trade-off is increased resource usage on the cluster, as each standby replica consumes disk space and network bandwidth to replicate the changelog.
group.instance.id so the coordinator can recognize it upon reconnection, avoiding a full group rebalance. // In your Streams properties
props.put(StreamsConfig.consumerPrefix("group.instance.id"), "my-app-" + instanceIndex);
Benchmarking Idempotency Overhead
The idempotency check (idempotencyStore.get(orderId)) is not free. It's a key-value lookup that, in the worst case, hits disk. You must benchmark this overhead.
ProcessorContext.currentStreamTimeMs() or System.currentTimeMillis() at the beginning and end of the process method to measure the processing latency per message.For many applications, the latency of a local RocksDB lookup (microseconds to a few milliseconds) is negligible compared to the network call to the external service. However, for high-throughput, low-latency streams, this overhead can be significant and must be accounted for.
Conclusion
Kafka's Exactly-Once Semantics provide a robust foundation for building reliable streaming applications, but they are not a silver bullet. True end-to-end processing guarantees require a deliberate, application-level approach to idempotency, especially when interacting with external systems.
The high-level DSL, while convenient, often lacks the fine-grained control needed for complex, stateful operations involving side-effects. By leveraging the Processor API, we gain direct access to state stores and the processing lifecycle, allowing us to implement powerful patterns like the idempotent processor.
As a senior engineer, your key takeaways should be: