Achieving Exactly-Once Semantics in Kafka Event Sourcing Systems

27 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Misnomer of "Exactly-Once"

In distributed systems, the term "exactly-once" is one of the most potent and misunderstood concepts. Any senior engineer who has wrestled with message queues and network partitions knows that true, metaphysical "exactly-once" delivery is impossible. A producer can send a message, and the network can fail before the acknowledgment is received. Did the broker get it? The producer can't know, so it must retry. This fundamental reality of distributed computing limits us to two practical guarantees: at-most-once (fire and forget) and at-least-once (retry until acknowledged).

So, what do we mean when we talk about Exactly-Once Semantics (EOS)? It's a composite guarantee: at-least-once delivery combined with idempotent processing. The system is engineered to tolerate message redelivery without causing duplicate side effects. In an event sourcing architecture, where application state is derived by replaying a log of events, preventing duplicate event processing is not just a best practice; it is a prerequisite for data integrity. A duplicated OrderCreated event could lead to double-shipping, and a duplicated PaymentProcessed event could lead to double-billing. The stakes are immense.

This article dissects the advanced patterns and mechanisms required to achieve EOS in a Kafka-based event sourcing system. We will skip the basics and dive directly into the production-level implementation details of idempotent producers, transactional producers, idempotent consumers, and the built-in EOS capabilities of Kafka Streams.


Part 1: The Producer Side - Ensuring Idempotent and Atomic Writes

The first half of the EOS equation is ensuring that the producer doesn't introduce duplicates into the Kafka log, even when facing retries. Kafka provides two powerful, related mechanisms for this: the Idempotent Producer and the Transactional Producer.

Pattern 1: The Idempotent Producer

Since Kafka 0.11, the producer can be configured to be idempotent. This guarantees that retries on produce requests will not result in duplicate messages being written to the partition's log. It's a surprisingly simple feature to enable, but its underlying mechanism is worth understanding.

Mechanism:

When you enable idempotence, the producer is assigned a unique Producer ID (PID). With every batch of messages sent to a partition, it includes this PID and a monotonically increasing sequence number. The broker leader for that partition keeps track of the latest (PID, sequence number) pair it has successfully written. If it receives a message with a sequence number it has already seen for a given PID, it rejects the write with a DUPLICATE_SEQUENCE_NUMBER error, which the producer client handles transparently.

Implementation:

Enabling this is a matter of configuration. It's so effective and has such minimal overhead that it should be considered a default for any critical production workload.

java
// Producer Configuration for Idempotence
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092,kafka-broker-2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Enable Idempotence
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

// These are set implicitly by enabling idempotence, but it's good to be explicit
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Required for idempotence
props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE)); // Retry indefinitely
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // Kafka < 3.0 requires this to be <= 5

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Edge Cases and Considerations:

  • Scope: Idempotence is guaranteed only within a single producer session and for a single partition. If the producer process restarts, it gets a new PID, and the sequence number resets. This is generally fine, as the application state that triggered the original produce request would typically be re-processed, leading to a new, distinct message.
  • Performance: The overhead is negligible. The broker simply needs to store an integer sequence number per PID per partition. The requirement for acks=all can add latency compared to acks=1, but this is a necessary trade-off for data durability that you should already be making in an event sourcing system.
  • Ordering: A side effect of setting max.in.flight.requests.per.connection to 5 or less is that it guarantees ordering per partition, even with retries. This is a critical property for many event-sourcing use cases.
  • Pattern 2: The Transactional Outbox Pattern for Atomicity

    The idempotent producer solves the problem of network-level retries. It does not solve the more complex business-level problem: How do you atomically update your application's database and publish an event to Kafka?

    This is the classic "dual write" problem. Consider this pseudo-code:

    java
    // ANTI-PATTERN: DO NOT DO THIS
    public void createOrder(Order order) {
        database.save(order); // Step 1: Write to DB
        kafkaProducer.send(new ProducerRecord("orders", order.getId(), order.toJson())); // Step 2: Publish to Kafka
    }

    What happens if the application crashes after the database commit but before the Kafka send() completes? The order exists in your system of record, but the corresponding OrderCreated event is never published. Downstream services that rely on this event will never know the order was created, leading to a severe state inconsistency.

    Flipping the order is no better. If you publish to Kafka first and then the database write fails, you have an event for an order that doesn't exist.

    The solution is the Transactional Outbox Pattern. The core idea is to leverage the atomicity of your local database transaction to ensure the event is captured reliably. The event is then published to Kafka by a separate, asynchronous process.

    Mechanism:

    • Begin a database transaction.
    • Perform your business logic (e.g., insert/update entities).
  • As part of the same transaction, insert a record representing the event to be published into an outbox table.
    • Commit the database transaction.

    Now, the business state change and the intent to publish an event are captured atomically. If either fails, the whole transaction rolls back. A separate process (a message relay) then reads from this outbox table and reliably publishes the messages to Kafka.

    Implementation (Service and Outbox Table):

    sql
    -- PostgreSQL Outbox Table
    CREATE TABLE outbox (
        id UUID PRIMARY KEY,
        aggregate_type VARCHAR(255) NOT NULL,
        aggregate_id VARCHAR(255) NOT NULL,
        event_type VARCHAR(255) NOT NULL,
        payload JSONB NOT NULL,
        created_at TIMESTAMPTZ DEFAULT NOW()
    );
    java
    // Order Service using Spring Data JPA and a Transaction
    @Service
    public class OrderService {
    
        @Autowired
        private OrderRepository orderRepository;
        @Autowired
        private OutboxRepository outboxRepository;
    
        @Transactional
        public Order createOrder(CreateOrderRequest request) {
            // 1. Business Logic
            Order newOrder = new Order(request.getCustomerId(), request.getItems());
            Order savedOrder = orderRepository.save(newOrder);
    
            // 2. Create Outbox Event within the same transaction
            OutboxEvent event = new OutboxEvent(
                "Order",
                savedOrder.getId().toString(),
                "OrderCreated",
                convertToJson(savedOrder)
            );
            outboxRepository.save(event);
    
            // 3. Transaction commits here, atomically saving both
            return savedOrder;
        }
        // ... other methods
    }

    Implementation (Message Relay):

    The relay can be implemented in a few ways, but the most robust is using Change Data Capture (CDC) with a tool like Debezium. Debezium can monitor your database's transaction log, capture committed changes to the outbox table in real-time, and publish them to a Kafka topic. This is highly efficient and reliable.

    A simpler, albeit less performant, alternative is a polling publisher.

    java
    // Simple Polling Relay (less ideal than CDC, but demonstrates the principle)
    @Component
    public class OutboxPoller {
    
        @Autowired
        private OutboxRepository outboxRepository;
        @Autowired
        private KafkaProducer<String, String> kafkaProducer;
    
        // Polls the outbox table every second
        @Scheduled(fixedDelay = 1000)
        @Transactional
        public void pollAndPublish() {
            // Find events that haven't been published
            List<OutboxEvent> events = outboxRepository.findTop100ByOrderByCreatedAt();
            if (events.isEmpty()) {
                return;
            }
    
            for (OutboxEvent event : events) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                    event.getAggregateType().toLowerCase() + "-events",
                    event.getAggregateId(),
                    event.getPayload()
                );
                // Using an idempotent producer here is still critical!
                kafkaProducer.send(record);
            }
    
            // Delete the events from the outbox table once sent
            outboxRepository.deleteAll(events);
        }
    }

    Kafka's Transactional Producer:

    Kafka also has a transactional API that allows for atomic writes to multiple topics and partitions. When combined with the consumer groups API, it allows for atomic "consume-process-produce" operations. This is the foundation of Kafka Streams' EOS, which we'll cover later. While powerful, it does not solve the dual-write problem with an external database on its own. You cannot enlist a PostgreSQL or MySQL transaction into a Kafka transaction. Therefore, the Transactional Outbox pattern remains the gold standard for atomicity between your database and Kafka.


    Part 2: The Consumer Side - Ensuring Idempotent Processing

    Even with a perfectly idempotent and atomic producer, the consumer can still cause duplicate processing. The canonical failure mode is:

    • Consumer reads a message.
    • Performs the business logic (e.g., updates a database, calls an API).
  • Crashes before committing the offset for the processed message.
  • When the consumer (or another instance in the same consumer group) restarts, it will fetch messages from the last committed offset and re-process the same message, leading to a duplicate side effect.

    To solve this, we must make the consumer's business logic idempotent.

    Pattern 3: The Idempotent Receiver

    The Idempotent Receiver pattern involves tracking the IDs of messages that have already been processed and skipping any duplicates.

    Mechanism:

  • Each event must have a unique, business-level identifier. This could be a UUID generated by the producer (eventId) or a composite key of aggregateId and sequenceNumber.
    • The consumer maintains a persistent store (e.g., a database table, a Redis set) of the IDs of processed events.
    • When a message is received, the consumer first checks if its ID is in the processed message store.
    • If the ID exists, the message is a duplicate. The consumer skips processing and simply commits the offset.
    • If the ID does not exist, the consumer processes the message. The business logic and the recording of the message ID must be performed atomically.

    Implementation (Consumer with a Database-backed Idempotency Store):

    sql
    -- Table to track processed message IDs
    CREATE TABLE processed_events (
        event_id UUID PRIMARY KEY,
        processed_at TIMESTAMPTZ DEFAULT NOW()
    );
    java
    // Kafka Consumer with Manual Offset Management and Idempotency Check
    public class IdempotentOrderConsumer {
        private final KafkaConsumer<String, String> consumer;
        private final OrderProcessingService processingService;
        private final DataSource dataSource; // For managing DB transactions
    
        public IdempotentOrderConsumer(/*...dependencies...*/) { ... }
    
        public void run() {
            consumer.subscribe(Collections.singletonList("order-events"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    // Extract a unique ID from the message header or payload
                    UUID eventId = UUID.fromString(new String(record.headers().lastHeader("eventId").value()));
    
                    Connection connection = null;
                    try {
                        connection = dataSource.getConnection();
                        connection.setAutoCommit(false); // Start transaction
    
                        // 1. Check for duplicate
                        if (isEventAlreadyProcessed(connection, eventId)) {
                            System.out.println("Skipping duplicate event: " + eventId);
                        } else {
                            // 2. Process the message
                            OrderEvent eventData = parseEvent(record.value());
                            processingService.handleOrderEvent(connection, eventData);
    
                            // 3. Mark event as processed within the same transaction
                            markEventAsProcessed(connection, eventId);
                        }
    
                        // 4. Commit the database transaction
                        connection.commit();
    
                    } catch (SQLException e) {
                        // If anything fails, roll back the DB transaction
                        if (connection != null) try { connection.rollback(); } catch (SQLException ex) { ... }
                        // Do not commit Kafka offset, allow for reprocessing
                        // Seek to the failing offset to retry immediately or handle differently
                        consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                        continue; // Continue to next poll loop
                    } finally {
                        if (connection != null) try { connection.close(); } catch (SQLException e) { ... }
                    }
    
                    // 5. Commit Kafka offset ONLY after DB transaction is successful
                    // This is still at-least-once, but the idempotency check handles the duplicates
                    consumer.commitSync(Collections.singletonMap(
                        new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1)
                    ));
                }
            }
        }
    
        private boolean isEventAlreadyProcessed(Connection conn, UUID eventId) throws SQLException {
            try (PreparedStatement stmt = conn.prepareStatement("SELECT 1 FROM processed_events WHERE event_id = ?")) {
                stmt.setObject(1, eventId);
                try (ResultSet rs = stmt.executeQuery()) {
                    return rs.next();
                }
            }
        }
    
        private void markEventAsProcessed(Connection conn, UUID eventId) throws SQLException {
            try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO processed_events (event_id) VALUES (?)")) {
                stmt.setObject(1, eventId);
                stmt.executeUpdate();
            }
        }
    }

    Performance and Edge Cases:

    * Latency: The idempotency check adds at least one database round-trip to every message's processing time. This is a significant performance trade-off for correctness.

    * Store Growth: The processed_events table can grow indefinitely. For high-volume topics, you'll need a TTL or other archival strategy for old event IDs.

    Atomicity is Key: The business logic and the idempotency key insertion must* be in the same database transaction. Otherwise, a crash between the two operations would leave the system in a state where it could not recover and would re-process the message on restart.


    Part 3: Kafka Streams and Native Exactly-Once Semantics

    For stream processing applications—where you consume from Kafka, transform the data, and produce back to Kafka—the Kafka Streams library offers a powerful, built-in EOS guarantee that simplifies this entire process.

    Mechanism:

    When you set processing.guarantee="exactly_once_v2" (available since Kafka 2.5, preferred over the older exactly_once), Kafka Streams leverages the transactional producer and consumer group coordination to provide end-to-end exactly-once semantics for any operations within the Kafka ecosystem.

    Here's how it works for a read-process-write flow:

  • Begin Transaction: When processing a batch of messages, the Streams task starts a Kafka transaction.
  • Produce Results: As your topology's logic (e.g., map, filter, join) produces output records, they are sent to the transactional producer. These messages are staged by the broker but are not yet visible to any consumer with isolation.level=read_committed.
  • Commit Offsets: After all messages in the batch are processed and their outputs are sent, the Streams task sends the source topic's consumer offsets to the transaction coordinator to be included in the same transaction.
  • Commit Transaction: The Streams task instructs the producer to commit the transaction. The transaction coordinator executes a two-phase commit protocol. It writes a "commit" marker to the transaction log, then writes markers to the output topic partitions and the internal consumer offsets topic. Only after this completes are the output messages made visible to downstream consumers and the input offsets considered committed.
  • This entire process is atomic. If the application crashes at any point before the transaction is committed, the transaction will be aborted. The output messages will be discarded, and the consumer offsets will not be advanced. Upon restart, the task will resume from the last successfully committed offset, guaranteeing no data loss and no duplicates.

    Implementation:

    java
    // Kafka Streams topology with EOS enabled
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor-eos");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
    // Enable Exactly-Once Semantics
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
    
    StreamsBuilder builder = new StreamsBuilder();
    
    builder.<String, String>stream("raw-events")
        .filter((key, value) -> isValidEvent(value))
        .mapValues(value -> enrichEvent(value))
        .to("enriched-events");
    
    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();

    The Critical Caveat: External Systems

    Kafka Streams' EOS is a powerful guarantee, but it is scoped to Kafka. The moment your stream processor needs to interact with an external system (e.g., call a REST API, write to a database), you are back outside the transactional boundary. The write to the external system is a side effect that Kafka's transaction coordinator knows nothing about.

    If your processor writes to a database and then crashes before its Kafka transaction is committed, the database write will persist, but the stream processor will re-process the input message on restart, leading to a duplicate write.

    To handle this, you must combine Kafka Streams with the patterns we discussed earlier:

    * Idempotent External System: Make the external API or database update idempotent (e.g., use INSERT ... ON CONFLICT DO UPDATE in PostgreSQL).

    * Transactional Outbox Sink: Instead of writing directly to the external system, the Streams processor writes to an "outbox" topic in Kafka. A separate, transaction-aware Kafka Connect sink connector (like the JDBC Sink Connector) can then consume from this topic and write to the database idempotently.

    Zombie Fencing:

    A subtle but critical problem in distributed stream processing is the "zombie instance." When a Kafka Streams instance is presumed dead due to a lost heartbeat, the consumer group rebalances, and its tasks are reassigned to another instance. However, the original instance might not be dead—it could be suffering from a long GC pause. When it wakes up, it's now a zombie: it's no longer part of the group but might try to commit a transaction based on stale information. EOS V2 helps prevent this by fencing out zombie producers. Each producer instance gets an "epoch," which is incremented on rebalance. The broker will reject transactional requests from a producer with an old epoch, effectively fencing it off.


    Conclusion: A Decision Framework for EOS

    There is no one-size-fits-all solution for exactly-once semantics. The correct pattern depends entirely on your system's architecture and requirements. Here's a decision framework:

    ScenarioPrimary PatternKey Considerations
    Simple, stateless message production (e.g., logging, metrics)Idempotent Producer (enable.idempotence=true)Low overhead, easy to enable. Protects against network-level duplicates. Should be your default for all producers.
    Atomically updating a database AND publishing an eventTransactional Outbox Pattern (with CDC/Debezium for the relay)The gold standard for atomicity between your primary data store and Kafka. Adds complexity (outbox table, relay process) but provides the strongest guarantee.
    Consuming events and performing business logic with external side effectsIdempotent Receiver (using a persistent store for event IDs)The consumer takes full responsibility for idempotency. Adds latency for the duplicate check. The atomicity of the business logic and the idempotency key write is critical.
    Pure Kafka-to-Kafka stream processing (e.g., ETL, enrichment, aggregation)Kafka Streams with EOS (processing.guarantee=exactly_once_v2)The simplest and most efficient solution when all I/O stays within Kafka. Handles failures, retries, and rebalances transparently.
    Kafka Streams with external database writesKafka Streams (EOS) + Transactional Outbox Sink or Idempotent writes to the external systemCombines the power of Streams EOS with patterns for handling external side effects. The sink connector or the external service must be designed for idempotency.

    Achieving true end-to-end exactly-once semantics is a testament to rigorous engineering. It requires a deep understanding of your tools, a careful analysis of failure modes, and a deliberate choice of patterns that balance correctness, performance, and operational complexity. By mastering these advanced techniques, you can build event-driven systems that are not just scalable and fast, but also resilient and correct.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles