Achieving Exactly-Once Semantics in Kafka Event Sourcing Systems
The Misnomer of "Exactly-Once"
In distributed systems, the term "exactly-once" is one of the most potent and misunderstood concepts. Any senior engineer who has wrestled with message queues and network partitions knows that true, metaphysical "exactly-once" delivery is impossible. A producer can send a message, and the network can fail before the acknowledgment is received. Did the broker get it? The producer can't know, so it must retry. This fundamental reality of distributed computing limits us to two practical guarantees: at-most-once (fire and forget) and at-least-once (retry until acknowledged).
So, what do we mean when we talk about Exactly-Once Semantics (EOS)? It's a composite guarantee: at-least-once delivery combined with idempotent processing. The system is engineered to tolerate message redelivery without causing duplicate side effects. In an event sourcing architecture, where application state is derived by replaying a log of events, preventing duplicate event processing is not just a best practice; it is a prerequisite for data integrity. A duplicated OrderCreated event could lead to double-shipping, and a duplicated PaymentProcessed event could lead to double-billing. The stakes are immense.
This article dissects the advanced patterns and mechanisms required to achieve EOS in a Kafka-based event sourcing system. We will skip the basics and dive directly into the production-level implementation details of idempotent producers, transactional producers, idempotent consumers, and the built-in EOS capabilities of Kafka Streams.
Part 1: The Producer Side - Ensuring Idempotent and Atomic Writes
The first half of the EOS equation is ensuring that the producer doesn't introduce duplicates into the Kafka log, even when facing retries. Kafka provides two powerful, related mechanisms for this: the Idempotent Producer and the Transactional Producer.
Pattern 1: The Idempotent Producer
Since Kafka 0.11, the producer can be configured to be idempotent. This guarantees that retries on produce requests will not result in duplicate messages being written to the partition's log. It's a surprisingly simple feature to enable, but its underlying mechanism is worth understanding.
Mechanism:
When you enable idempotence, the producer is assigned a unique Producer ID (PID). With every batch of messages sent to a partition, it includes this PID and a monotonically increasing sequence number. The broker leader for that partition keeps track of the latest (PID, sequence number) pair it has successfully written. If it receives a message with a sequence number it has already seen for a given PID, it rejects the write with a DUPLICATE_SEQUENCE_NUMBER error, which the producer client handles transparently.
Implementation:
Enabling this is a matter of configuration. It's so effective and has such minimal overhead that it should be considered a default for any critical production workload.
// Producer Configuration for Idempotence
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092,kafka-broker-2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Enable Idempotence
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// These are set implicitly by enabling idempotence, but it's good to be explicit
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Required for idempotence
props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE)); // Retry indefinitely
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // Kafka < 3.0 requires this to be <= 5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Edge Cases and Considerations:
acks=all can add latency compared to acks=1, but this is a necessary trade-off for data durability that you should already be making in an event sourcing system.max.in.flight.requests.per.connection to 5 or less is that it guarantees ordering per partition, even with retries. This is a critical property for many event-sourcing use cases.Pattern 2: The Transactional Outbox Pattern for Atomicity
The idempotent producer solves the problem of network-level retries. It does not solve the more complex business-level problem: How do you atomically update your application's database and publish an event to Kafka?
This is the classic "dual write" problem. Consider this pseudo-code:
// ANTI-PATTERN: DO NOT DO THIS
public void createOrder(Order order) {
database.save(order); // Step 1: Write to DB
kafkaProducer.send(new ProducerRecord("orders", order.getId(), order.toJson())); // Step 2: Publish to Kafka
}
What happens if the application crashes after the database commit but before the Kafka send() completes? The order exists in your system of record, but the corresponding OrderCreated event is never published. Downstream services that rely on this event will never know the order was created, leading to a severe state inconsistency.
Flipping the order is no better. If you publish to Kafka first and then the database write fails, you have an event for an order that doesn't exist.
The solution is the Transactional Outbox Pattern. The core idea is to leverage the atomicity of your local database transaction to ensure the event is captured reliably. The event is then published to Kafka by a separate, asynchronous process.
Mechanism:
- Begin a database transaction.
- Perform your business logic (e.g., insert/update entities).
outbox table.- Commit the database transaction.
Now, the business state change and the intent to publish an event are captured atomically. If either fails, the whole transaction rolls back. A separate process (a message relay) then reads from this outbox table and reliably publishes the messages to Kafka.
Implementation (Service and Outbox Table):
-- PostgreSQL Outbox Table
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
// Order Service using Spring Data JPA and a Transaction
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private OutboxRepository outboxRepository;
@Transactional
public Order createOrder(CreateOrderRequest request) {
// 1. Business Logic
Order newOrder = new Order(request.getCustomerId(), request.getItems());
Order savedOrder = orderRepository.save(newOrder);
// 2. Create Outbox Event within the same transaction
OutboxEvent event = new OutboxEvent(
"Order",
savedOrder.getId().toString(),
"OrderCreated",
convertToJson(savedOrder)
);
outboxRepository.save(event);
// 3. Transaction commits here, atomically saving both
return savedOrder;
}
// ... other methods
}
Implementation (Message Relay):
The relay can be implemented in a few ways, but the most robust is using Change Data Capture (CDC) with a tool like Debezium. Debezium can monitor your database's transaction log, capture committed changes to the outbox table in real-time, and publish them to a Kafka topic. This is highly efficient and reliable.
A simpler, albeit less performant, alternative is a polling publisher.
// Simple Polling Relay (less ideal than CDC, but demonstrates the principle)
@Component
public class OutboxPoller {
@Autowired
private OutboxRepository outboxRepository;
@Autowired
private KafkaProducer<String, String> kafkaProducer;
// Polls the outbox table every second
@Scheduled(fixedDelay = 1000)
@Transactional
public void pollAndPublish() {
// Find events that haven't been published
List<OutboxEvent> events = outboxRepository.findTop100ByOrderByCreatedAt();
if (events.isEmpty()) {
return;
}
for (OutboxEvent event : events) {
ProducerRecord<String, String> record = new ProducerRecord<>(
event.getAggregateType().toLowerCase() + "-events",
event.getAggregateId(),
event.getPayload()
);
// Using an idempotent producer here is still critical!
kafkaProducer.send(record);
}
// Delete the events from the outbox table once sent
outboxRepository.deleteAll(events);
}
}
Kafka's Transactional Producer:
Kafka also has a transactional API that allows for atomic writes to multiple topics and partitions. When combined with the consumer groups API, it allows for atomic "consume-process-produce" operations. This is the foundation of Kafka Streams' EOS, which we'll cover later. While powerful, it does not solve the dual-write problem with an external database on its own. You cannot enlist a PostgreSQL or MySQL transaction into a Kafka transaction. Therefore, the Transactional Outbox pattern remains the gold standard for atomicity between your database and Kafka.
Part 2: The Consumer Side - Ensuring Idempotent Processing
Even with a perfectly idempotent and atomic producer, the consumer can still cause duplicate processing. The canonical failure mode is:
- Consumer reads a message.
- Performs the business logic (e.g., updates a database, calls an API).
When the consumer (or another instance in the same consumer group) restarts, it will fetch messages from the last committed offset and re-process the same message, leading to a duplicate side effect.
To solve this, we must make the consumer's business logic idempotent.
Pattern 3: The Idempotent Receiver
The Idempotent Receiver pattern involves tracking the IDs of messages that have already been processed and skipping any duplicates.
Mechanism:
eventId) or a composite key of aggregateId and sequenceNumber.- The consumer maintains a persistent store (e.g., a database table, a Redis set) of the IDs of processed events.
- When a message is received, the consumer first checks if its ID is in the processed message store.
- If the ID exists, the message is a duplicate. The consumer skips processing and simply commits the offset.
- If the ID does not exist, the consumer processes the message. The business logic and the recording of the message ID must be performed atomically.
Implementation (Consumer with a Database-backed Idempotency Store):
-- Table to track processed message IDs
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ DEFAULT NOW()
);
// Kafka Consumer with Manual Offset Management and Idempotency Check
public class IdempotentOrderConsumer {
private final KafkaConsumer<String, String> consumer;
private final OrderProcessingService processingService;
private final DataSource dataSource; // For managing DB transactions
public IdempotentOrderConsumer(/*...dependencies...*/) { ... }
public void run() {
consumer.subscribe(Collections.singletonList("order-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// Extract a unique ID from the message header or payload
UUID eventId = UUID.fromString(new String(record.headers().lastHeader("eventId").value()));
Connection connection = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false); // Start transaction
// 1. Check for duplicate
if (isEventAlreadyProcessed(connection, eventId)) {
System.out.println("Skipping duplicate event: " + eventId);
} else {
// 2. Process the message
OrderEvent eventData = parseEvent(record.value());
processingService.handleOrderEvent(connection, eventData);
// 3. Mark event as processed within the same transaction
markEventAsProcessed(connection, eventId);
}
// 4. Commit the database transaction
connection.commit();
} catch (SQLException e) {
// If anything fails, roll back the DB transaction
if (connection != null) try { connection.rollback(); } catch (SQLException ex) { ... }
// Do not commit Kafka offset, allow for reprocessing
// Seek to the failing offset to retry immediately or handle differently
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
continue; // Continue to next poll loop
} finally {
if (connection != null) try { connection.close(); } catch (SQLException e) { ... }
}
// 5. Commit Kafka offset ONLY after DB transaction is successful
// This is still at-least-once, but the idempotency check handles the duplicates
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
}
}
}
private boolean isEventAlreadyProcessed(Connection conn, UUID eventId) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement("SELECT 1 FROM processed_events WHERE event_id = ?")) {
stmt.setObject(1, eventId);
try (ResultSet rs = stmt.executeQuery()) {
return rs.next();
}
}
}
private void markEventAsProcessed(Connection conn, UUID eventId) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO processed_events (event_id) VALUES (?)")) {
stmt.setObject(1, eventId);
stmt.executeUpdate();
}
}
}
Performance and Edge Cases:
* Latency: The idempotency check adds at least one database round-trip to every message's processing time. This is a significant performance trade-off for correctness.
* Store Growth: The processed_events table can grow indefinitely. For high-volume topics, you'll need a TTL or other archival strategy for old event IDs.
Atomicity is Key: The business logic and the idempotency key insertion must* be in the same database transaction. Otherwise, a crash between the two operations would leave the system in a state where it could not recover and would re-process the message on restart.
Part 3: Kafka Streams and Native Exactly-Once Semantics
For stream processing applications—where you consume from Kafka, transform the data, and produce back to Kafka—the Kafka Streams library offers a powerful, built-in EOS guarantee that simplifies this entire process.
Mechanism:
When you set processing.guarantee="exactly_once_v2" (available since Kafka 2.5, preferred over the older exactly_once), Kafka Streams leverages the transactional producer and consumer group coordination to provide end-to-end exactly-once semantics for any operations within the Kafka ecosystem.
Here's how it works for a read-process-write flow:
map, filter, join) produces output records, they are sent to the transactional producer. These messages are staged by the broker but are not yet visible to any consumer with isolation.level=read_committed.This entire process is atomic. If the application crashes at any point before the transaction is committed, the transaction will be aborted. The output messages will be discarded, and the consumer offsets will not be advanced. Upon restart, the task will resume from the last successfully committed offset, guaranteeing no data loss and no duplicates.
Implementation:
// Kafka Streams topology with EOS enabled
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor-eos");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Enable Exactly-Once Semantics
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("raw-events")
.filter((key, value) -> isValidEvent(value))
.mapValues(value -> enrichEvent(value))
.to("enriched-events");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
The Critical Caveat: External Systems
Kafka Streams' EOS is a powerful guarantee, but it is scoped to Kafka. The moment your stream processor needs to interact with an external system (e.g., call a REST API, write to a database), you are back outside the transactional boundary. The write to the external system is a side effect that Kafka's transaction coordinator knows nothing about.
If your processor writes to a database and then crashes before its Kafka transaction is committed, the database write will persist, but the stream processor will re-process the input message on restart, leading to a duplicate write.
To handle this, you must combine Kafka Streams with the patterns we discussed earlier:
* Idempotent External System: Make the external API or database update idempotent (e.g., use INSERT ... ON CONFLICT DO UPDATE in PostgreSQL).
* Transactional Outbox Sink: Instead of writing directly to the external system, the Streams processor writes to an "outbox" topic in Kafka. A separate, transaction-aware Kafka Connect sink connector (like the JDBC Sink Connector) can then consume from this topic and write to the database idempotently.
Zombie Fencing:
A subtle but critical problem in distributed stream processing is the "zombie instance." When a Kafka Streams instance is presumed dead due to a lost heartbeat, the consumer group rebalances, and its tasks are reassigned to another instance. However, the original instance might not be dead—it could be suffering from a long GC pause. When it wakes up, it's now a zombie: it's no longer part of the group but might try to commit a transaction based on stale information. EOS V2 helps prevent this by fencing out zombie producers. Each producer instance gets an "epoch," which is incremented on rebalance. The broker will reject transactional requests from a producer with an old epoch, effectively fencing it off.
Conclusion: A Decision Framework for EOS
There is no one-size-fits-all solution for exactly-once semantics. The correct pattern depends entirely on your system's architecture and requirements. Here's a decision framework:
| Scenario | Primary Pattern | Key Considerations |
|---|---|---|
| Simple, stateless message production (e.g., logging, metrics) | Idempotent Producer (enable.idempotence=true) | Low overhead, easy to enable. Protects against network-level duplicates. Should be your default for all producers. |
| Atomically updating a database AND publishing an event | Transactional Outbox Pattern (with CDC/Debezium for the relay) | The gold standard for atomicity between your primary data store and Kafka. Adds complexity (outbox table, relay process) but provides the strongest guarantee. |
| Consuming events and performing business logic with external side effects | Idempotent Receiver (using a persistent store for event IDs) | The consumer takes full responsibility for idempotency. Adds latency for the duplicate check. The atomicity of the business logic and the idempotency key write is critical. |
| Pure Kafka-to-Kafka stream processing (e.g., ETL, enrichment, aggregation) | Kafka Streams with EOS (processing.guarantee=exactly_once_v2) | The simplest and most efficient solution when all I/O stays within Kafka. Handles failures, retries, and rebalances transparently. |
| Kafka Streams with external database writes | Kafka Streams (EOS) + Transactional Outbox Sink or Idempotent writes to the external system | Combines the power of Streams EOS with patterns for handling external side effects. The sink connector or the external service must be designed for idempotency. |
Achieving true end-to-end exactly-once semantics is a testament to rigorous engineering. It requires a deep understanding of your tools, a careful analysis of failure modes, and a deliberate choice of patterns that balance correctness, performance, and operational complexity. By mastering these advanced techniques, you can build event-driven systems that are not just scalable and fast, but also resilient and correct.