Achieving Exactly-Once Semantics in Kafka Event Sourcing Systems

17 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Elusive Exactly-Once Guarantee: From Theory to Production Reality

In the world of distributed systems, "exactly-once" processing is often treated as a mythical creature. While many systems claim to offer it, the reality is a complex interplay of trade-offs, specific configurations, and, most critically, disciplined application-level design. For senior engineers building mission-critical systems on Apache Kafka, particularly those employing event sourcing patterns, achieving this guarantee is not an academic exercise—it's a fundamental requirement for data integrity.

This article is not an introduction. It assumes you understand why at-least-once delivery can lead to duplicate processing and why at-most-once can lead to data loss. Our focus is on the how: the precise mechanisms within Kafka and the architectural patterns outside of it that are required to construct a truly robust, end-to-end exactly-once semantics (EOS) pipeline.

We will dissect the three pillars of Kafka's EOS support and then, crucially, address the "last mile" problem: ensuring that the effects of a consumed message are applied exactly once to an external system, such as a relational database. We'll explore production-ready code, performance implications, and the subtle edge cases that can undermine your entire EOS strategy.

The Foundation: Kafka's Three Pillars of Exactly-Once Semantics

Kafka introduced EOS capabilities in version 0.11. These guarantees are built upon three core concepts working in concert: the idempotent producer, atomic multi-partition transactions, and transaction-aware consumers.

Pillar 1: The Idempotent Producer

Idempotency is the foundation upon which Kafka's EOS is built. An idempotent operation is one that can be performed multiple times without changing the result beyond the initial application. In the context of a Kafka producer, this means that retrying a send() operation due to a transient network error will not result in the message being written to the Kafka log multiple times.

Mechanism:

To enable idempotency, you set a single producer property:

properties
enable.idempotence=true

When enabled, the broker assigns the producer a unique Producer ID (PID) and the producer includes a monotonically increasing sequence number with each message sent to a specific topic-partition. The broker tracks the highest sequence number it has successfully written for each (PID, topic-partition) pair. If it receives a message with a sequence number less than or equal to the last one it recorded, it discards the message as a duplicate. If the sequence number is exactly one greater than the last, it accepts the message. If it's more than one greater, it rejects the request with an OutOfOrderSequenceException, forcing the producer to handle the failure.

Configuration (Java - Spring Kafka):

java
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.HashMap;
import java.util.Map;

// Within a @Configuration class
@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    // --- EOS Configuration --- 
    configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    // For idempotency, acks must be 'all'. This is set by default when idempotence is enabled.
    // configProps.put(ProducerConfig.ACKS_CONFIG, "all"); 
    // Retries are also defaulted to a large value (Integer.MAX_VALUE) with idempotence.
    // configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    return new DefaultKafkaProducerFactory<>(configProps);
}

Limitations and Edge Cases:

The idempotent producer's guarantee is scoped to a single producer session and a single partition.

  • Producer Restart: If the producer application restarts, it gets a new PID. The idempotency guarantee does not hold across restarts.
  • Multi-Partition Writes: If you need to write to multiple partitions atomically (i.e., either all writes succeed or none do), idempotency alone is insufficient. A failure after writing to partition A but before writing to partition B would leave the system in an inconsistent state.
  • These limitations are precisely why we need the second pillar: transactions.

    Pillar 2: Atomic Transactions

    Kafka's transactional API extends idempotency across producer sessions and enables atomic writes to multiple topic-partitions. This is the cornerstone of the classic "read-process-write" pattern in stream processing applications.

    Mechanism:

    To use transactions, you must provide a stable transactional.id in your producer configuration. This ID allows the broker to identify the producer instance across application restarts.

    properties
    transactional.id=my-order-processor-01

    When a producer with a transactional.id is initialized, it fences off any "zombie" producers—older instances with the same transactional.id that may have been delayed by a long GC pause or network partition. The broker achieves this by bumping an epoch for the given transactional.id. Any requests from a producer with an older epoch are rejected with a ProducerFencedException.

    The transactional workflow is explicit:

  • producer.initTransactions(): Registers the transactional.id with the transaction coordinator in the broker and gets a PID.
  • producer.beginTransaction(): Marks the start of a transaction.
  • producer.send(): Sends one or more messages to various topics/partitions.
  • producer.sendOffsetsToTransaction(): If consuming as part of the transaction, this commits the consumer offsets atomically with the produced messages.
  • producer.commitTransaction() or producer.abortTransaction(): Completes the transaction. The messages are only made visible to transaction-aware consumers after a commit.
  • Production-Grade Example (Java - Spring Kafka):

    Let's model a service that processes payment commands, and upon success, produces a PaymentProcessed event. The entire operation must be atomic.

    java
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    @Service
    public class PaymentProcessorService {
    
        private final KafkaTemplate<String, PaymentProcessedEvent> kafkaTemplate;
    
        public PaymentProcessorService(KafkaTemplate<String, PaymentProcessedEvent> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        // Spring's @Transactional will manage the Kafka transaction lifecycle
        // when a KafkaTransactionManager is configured.
        @Transactional("kafkaTransactionManager")
        public void processPayment(ProcessPaymentCommand command) {
            // 1. Perform business logic (e.g., call external payment gateway)
            boolean paymentSuccess = callPaymentGateway(command.getPaymentDetails());
    
            if (paymentSuccess) {
                PaymentProcessedEvent event = new PaymentProcessedEvent(
                    command.getOrderId(), 
                    command.getAmount(), 
                    System.currentTimeMillis()
                );
    
                // 2. This send is part of the ongoing Kafka transaction.
                // The message will not be visible to consumers until the transaction commits.
                kafkaTemplate.send("payment-events", command.getOrderId(), event);
            } else {
                // If we needed to produce a failure event, we could do so here.
                // The transaction would still commit successfully.
                // To abort the transaction, we would throw an exception.
                throw new PaymentProcessingException("Payment gateway declined transaction.");
            }
        }
    
        private boolean callPaymentGateway(Object details) { /* ... */ return true; }
    }

    To make this work, the ProducerFactory needs the transactional.id and we need a KafkaTransactionManager:

    java
    // In ProducerFactory configuration
    // ... existing properties
    factory.setTransactionIdPrefix("payment-tx-"); // Spring will append a suffix
    
    // In main configuration class
    @Bean
    public KafkaTransactionManager kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    Pillar 3: The Transaction-Aware Consumer

    The final piece of the puzzle within Kafka is ensuring that consumers only read messages that are part of a successfully committed transaction. Uncommitted (in-flight) or aborted messages should be invisible.

    This is controlled by a single consumer configuration property:

    properties
    isolation.level=read_committed

    The default value is read_uncommitted, which means the consumer will read messages as soon as they are written to the log, regardless of transaction status. When set to read_committed, the consumer will buffer messages from the broker and only deliver them to your application code once it receives a transaction commit marker from the broker.

    Configuration (Java - Spring Kafka):

    java
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    
    // Within a @Configuration class
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    
        // --- EOS Configuration --- 
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    
        return new DefaultConsumerFactory<>(props);
    }

    With these three pillars, Kafka provides a powerful guarantee: a message will be processed in a read-process-write cycle exactly once within the Kafka ecosystem. But what happens when the 'write' part of that cycle involves an external system?

    The Last Mile Problem: End-to-End EOS with External Systems

    Kafka transactions cannot be enlisted in a 2-Phase Commit (2PC) with an external transactional resource like a PostgreSQL database. This means you cannot atomically commit a Kafka transaction and a database transaction together. This is the "last mile" problem, and solving it requires careful application-level patterns.

    Pattern 1: The Idempotent Consumer

    If your processing logic is naturally idempotent, you can simply re-run it on duplicate messages without adverse side effects. However, most business logic is not. The solution is to make your consumer statefully idempotent.

    This involves storing a unique identifier from each message you process and checking against that store before executing your business logic. The message identifier and the business logic must be committed atomically in the external system.

    Scenario: An InventoryService consumes OrderCreated events and decrements stock levels in a PostgreSQL database.

    Message Structure:

    json
    {
      "eventId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
      "orderId": "ORD-1001",
      "productId": "PROD-55",
      "quantity": 2
    }

    Database Schema:

    sql
    CREATE TABLE products (
        product_id VARCHAR(255) PRIMARY KEY,
        stock_level INT NOT NULL
    );
    
    CREATE TABLE processed_events (
        event_id UUID PRIMARY KEY,
        processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );

    Idempotent Consumer Implementation (Java - Spring JDBC & Kafka):

    java
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.dao.DuplicateKeyException;
    
    @Service
    public class InventoryConsumer {
    
        private final JdbcTemplate jdbcTemplate;
    
        public InventoryConsumer(JdbcTemplate jdbcTemplate) {
            this.jdbcTemplate = jdbcTemplate;
        }
    
        @KafkaListener(topics = "order-events", groupId = "inventory-service")
        @Transactional("transactionManager") // Standard Spring DB Transaction Manager
        public void handleOrderCreated(OrderCreatedEvent event) {
            try {
                // 1. Atomically check for and record the event ID.
                // This insert will fail with a constraint violation if the eventId already exists.
                jdbcTemplate.update("INSERT INTO processed_events (event_id) VALUES (?)", event.getEventId());
    
            } catch (DuplicateKeyException e) {
                // This is not an error. It's an expected outcome for a redelivered message.
                log.warn("Received duplicate event, already processed: {}", event.getEventId());
                return; // Gracefully exit
            }
    
            // 2. If the insert succeeded, we can safely perform the business logic.
            // Because this entire method is wrapped in a DB transaction, both the insert
            // into processed_events and the update to products will succeed or fail together.
            int updatedRows = jdbcTemplate.update(
                "UPDATE products SET stock_level = stock_level - ? WHERE product_id = ? AND stock_level >= ?",
                event.getQuantity(),
                event.getProductId(),
                event.getQuantity()
            );
    
            if (updatedRows == 0) {
                // Handle the business error, e.g., insufficient stock.
                // Throwing an exception here will roll back the entire DB transaction,
                // including the processed_events insert. The Kafka message will be redelivered
                // by the container's error handler, allowing for a retry.
                throw new InsufficientStockException("Not enough stock for product: " + event.getProductId());
            }
        }
    }

    This pattern is robust. If the process crashes after the UPDATE but before the Kafka consumer offset is committed, the message will be redelivered upon restart. The INSERT into processed_events will then fail with a DuplicateKeyException, preventing the stock from being decremented a second time.

    Pattern 2: The Transactional Outbox Pattern

    What if the source of your event is a state change within your service itself? Consider a UserService that handles a POST /users request. It needs to (1) insert a new user into the database and (2) publish a UserCreated event to Kafka. These two actions must be atomic.

    If you write to the database first and then the process crashes before publishing to Kafka, you have an inconsistent state (a user exists but no event was sent). If you publish to Kafka first and the database write fails, you have a different inconsistent state (an event exists for a non-existent user).

    This is where the transactional outbox pattern shines. It leverages the atomicity of the local database transaction.

    Mechanism:

  • Start a local database transaction.
  • Execute your business logic (e.g., INSERT the new user into the users table).
  • As part of the same transaction, insert a record representing the event to be published into an outbox table.
  • Commit the database transaction. Now, the state change and the intent to publish are atomically persisted.
  • A separate, asynchronous process (a message relay) monitors the outbox table. This can be a dedicated thread in your application, or more robustly, a Change Data Capture (CDC) tool like Debezium streaming changes to a Kafka topic.
  • The relay reads new events from the outbox, publishes them to Kafka, and then marks them as processed in the outbox table.
  • Schema:

    sql
    CREATE TABLE outbox (
        id UUID PRIMARY KEY,
        aggregate_type VARCHAR(255) NOT NULL,
        aggregate_id VARCHAR(255) NOT NULL,
        event_type VARCHAR(255) NOT NULL,
        payload JSONB NOT NULL,
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );

    Service Logic (Conceptual):

    java
    @Service
    public class UserService {
    
        private final UserRepository userRepository;
        private final OutboxRepository outboxRepository;
        private final PlatformTransactionManager transactionManager;
    
        // ... constructor ...
    
        public User createUser(CreateUserRequest request) {
            TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
            try {
                // 1. Business logic: create and save the user entity.
                User newUser = new User(request.getName(), request.getEmail());
                userRepository.save(newUser);
    
                // 2. Create the event payload.
                UserCreatedEvent event = new UserCreatedEvent(newUser.getId(), newUser.getName());
    
                // 3. Save the event to the outbox table within the same transaction.
                OutboxEvent outboxEvent = new OutboxEvent(
                    "User", newUser.getId(), "UserCreated", toJson(event)
                );
                outboxRepository.save(outboxEvent);
    
                // 4. Commit the single atomic transaction.
                transactionManager.commit(status);
                return newUser;
            } catch (Exception e) {
                transactionManager.rollback(status);
                throw new UserCreationException("Failed to create user", e);
            }
        }
    }

    This pattern provides the strongest consistency guarantee by making the database the source of truth for both state and events. The message relay (e.g., Debezium with Kafka Connect) is designed for reliable, at-least-once delivery from the database log to Kafka. When paired with an idempotent Kafka producer on the Kafka Connect side, you achieve a highly reliable exactly-once pipeline.

    Performance and Production Pitfalls

    Enabling EOS is not free. It introduces performance overhead and new failure modes to consider.

  • Increased Latency: Transactions add latency. Messages are not visible to read_committed consumers until the transaction coordinator has written the commit marker. This can add tens to hundreds of milliseconds to your end-to-end latency. Benchmark this impact for your specific workload.
  • Transaction Coordinator Load: All transaction management (init, commit, abort) goes through a single broker in the cluster designated as the transaction coordinator. This can become a bottleneck in clusters with a very high number of concurrent, short-lived transactions. Monitor the TransactionCoordinator metrics on your brokers.
  • Transaction Timeouts (transaction.timeout.ms): This producer setting dictates the maximum time between the start of a transaction and its commit/abort. If your processing takes longer than this timeout, the coordinator will proactively abort the transaction. Critically, this timeout must be larger than the sum of your consumer's max.poll.interval.ms and the time it takes to process the batch of records. A common misconfiguration is to have a short transaction timeout and long processing time, leading to constant transaction aborts.
  • Zombie Fencing is Not Magic: The transactional.id is the key to fencing. If two separate microservice deployments are accidentally configured with the same transactional.id, one will constantly be fenced, leading to bizarre, intermittent processing failures. These IDs must be unique per logical producer application.
  • Log Compaction and Transactions: Be aware that aborted transactions still write data to the log. While read_committed consumers won't see this data, it occupies disk space until the log is cleaned up. This can have a minor impact on storage, but more importantly, log compaction can be delayed for partitions with long-running open transactions, as Kafka needs to retain the transactional markers.
  • Conclusion: A Deliberate Engineering Choice

    Achieving end-to-end exactly-once semantics in a Kafka-based event sourcing system is a significant engineering feat. It is not a feature you simply "turn on." It is an architectural commitment.

  • Start with Kafka's primitives: Correctly configure the idempotent producer, use the transactional API for atomic multi-partition writes, and set isolation.level=read_committed on consumers.
  • Solve the last mile: For consumers interacting with external systems, implement statefully idempotent logic, typically by atomically recording a unique event ID and performing business logic within a single database transaction.
  • Embrace the transactional outbox: For producers that must atomically link a database state change with an event publication, the transactional outbox pattern is the gold standard for correctness, shifting the burden of reliable delivery to a dedicated CDC tool like Debezium.
  • This level of guarantee comes at the cost of increased complexity and performance overhead. Do not apply it universally. Reserve it for the core business domains where financial transactions, critical state machine transitions, or auditable event logs demand absolute data integrity. For less critical pathways, a well-designed idempotent consumer with at-least-once delivery is often a more pragmatic and performant choice.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles