Achieving Exactly-Once Semantics with Debezium, Kafka, & Idempotent Consumers
The Elusive Goal of Exactly-Once Semantics
In the world of distributed systems, "exactly-once" is one of the most misunderstood and debated terms. A pure, mathematically provable exactly-once system that spans multiple independent components (database, broker, consumer) is practically impossible due to the challenges outlined in the Two Generals' Problem. Network partitions, process crashes, and unpredictable latencies mean we can never be 100% certain that an action was completed and that the acknowledgment was successfully received.
However, for senior engineers building critical business systems, this theoretical impossibility is irrelevant. The practical goal is to achieve effective exactly-once semantics: ensuring that a business operation, triggered by an event, is executed to completion precisely one time from the perspective of the system's state. We achieve this not through a magical protocol, but by combining two robust guarantees: at-least-once delivery and idempotent processing.
This article dissects a production-proven architecture for achieving this goal. We'll focus on a common and powerful stack: a relational database (PostgreSQL) as the source of truth, Debezium for reliable Change Data Capture (CDC), Kafka as the durable event backbone, and a stateful consumer application that must perform actions without side effects on redelivery.
Let's consider our driving scenario: an e-commerce platform. When an order is placed in the OrderService, we need to trigger downstream processes in the PaymentService and ShippingService. Double-charging a customer or shipping an order twice due to a message redelivery is a critical business failure. This is the problem we will solve.
The Foundation: Transactional Outbox and Debezium
The first and most critical failure point in many event-driven architectures is the initial event publication. A common anti-pattern is the "dual write":
// ANTI-PATTERN: DO NOT DO THIS
func (s *OrderService) CreateOrder(ctx context.Context, order models.Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback() // Rollback on error
    // 1. Write to the database
    if err := s.orderRepo.Save(tx, order); err != nil {
        return err
    }
    // 2. Publish to Kafka
    // WHAT HAPPENS IF THIS FAILS? THE DB COMMIT IS SEPARATE.
    if err := s.kafkaProducer.Publish("orders", order.ToEvent()); err != nil {
        // The DB write might succeed, but the event is lost!
        return err
    }
    return tx.Commit()
}If the application crashes or the network fails between the database commit and the Kafka publish, we have data inconsistency. The order exists in our system, but the event to trigger payment is lost forever.
The Transactional Outbox pattern solves this by leveraging the atomicity of a local database transaction. Instead of publishing directly to a message broker, we write the event to a dedicated outbox_events table within the same database transaction as the business state change.
Database Schema Design
Let's define the tables in our PostgreSQL database for the OrderService.
CREATE TABLE orders (
    order_id UUID PRIMARY KEY,
    customer_id UUID NOT NULL,
    order_status VARCHAR(50) NOT NULL,
    total_amount NUMERIC(10, 2) NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE outbox_events (
    event_id UUID PRIMARY KEY,
    aggregate_type VARCHAR(255) NOT NULL, -- e.g., 'order'
    aggregate_id UUID NOT NULL,          -- e.g., order_id
    event_type VARCHAR(255) NOT NULL,    -- e.g., 'OrderCreated'
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);The key insight is that writing to orders and outbox_events can be wrapped in a single, atomic transaction.
// CORRECT PATTERN: Transactional Outbox
func (s *OrderService) CreateOrder(ctx context.Context, order models.Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    // 1. Save the business entity
    if err := s.orderRepo.Save(tx, order); err != nil {
        return err
    }
    // 2. Create the outbox event
    event := models.OutboxEvent{
        EventID:       uuid.New(),
        AggregateType: "order",
        AggregateID:   order.ID,
        EventType:     "OrderCreated",
        Payload:       order.ToJSONB(),
    }
    if err := s.outboxRepo.Save(tx, event); err != nil {
        return err
    }
    // 3. Commit the single atomic transaction
    return tx.Commit()
}Now, the creation of the order and the event record are inseparable. They either both succeed or both fail. We have guaranteed that no event will be lost.
Configuring Debezium for Outbox
Next, we use Debezium to reliably stream changes from our outbox_events table to Kafka. We configure a Debezium PostgreSQL connector to only monitor this table. This prevents us from publishing internal state changes from the orders table and ensures we only publish events that are part of our public contract.
Here is a production-ready Debezium connector configuration:
{
    "name": "order-outbox-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "orders_db",
        "database.server.name": "orders_server",
        "table.include.list": "public.outbox_events",
        "tombstones.on.delete": "false",
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.by.field": "aggregate_type",
        "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
        "transforms.outbox.table.field.event.key": "aggregate_id",
        "producer.override.acks": "all",
        "producer.override.retries": "2147483647",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}Key Configuration Details:
*   table.include.list: We explicitly tell Debezium to only watch public.outbox_events.
*   tombstones.on.delete: Set to false. We will manage cleanup of the outbox table separately and don't want Debezium to emit null-keyed messages (tombstones) on deletion.
*   transforms: We use Debezium's built-in EventRouter Single Message Transform (SMT). This is a powerful feature that reshapes the raw CDC event into a clean business event. It reads the fields from our outbox table and uses them to route the message to the correct topic (order.events) with the correct key (aggregate_id). This keeps our Kafka topics clean and business-focused.
*   producer.override.acks: Set to all. This ensures Debezium waits for confirmation from all in-sync replicas in the Kafka cluster before considering a message sent, providing the highest level of durability.
*   producer.override.retries: A very high number ensures that transient network issues don't cause Debezium to fail.
With this setup, we have a bulletproof pipeline for getting events from our service's database to Kafka with an at-least-once guarantee.
Kafka and Consumer Configuration for At-Least-Once Delivery
Debezium has published the message reliably. Now, the consumer must be configured to process it reliably. The default behavior of many Kafka clients is to auto-commit offsets periodically in the background. This is dangerous. A consumer could fetch a batch of messages, auto-commit their offsets, and then crash before processing them. On restart, it would skip these messages entirely.
We must use manual offset management.
Here are the critical consumer settings (shown in the context of a Go application using the sarama library):
func NewConsumerGroup() (sarama.ConsumerGroup, error) {
    config := sarama.NewConfig()
    config.Version = sarama.V2_8_0_0 // Use a specific, recent version
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    
    // CRITICAL: Disable auto-commit
    config.Consumer.Offsets.AutoCommit.Enable = false
    
    // Ensures that during a rebalance, the consumer finishes processing its current batch
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    // ... other config
    return sarama.NewConsumerGroup(brokers, "payment-service-group", config)
}The consumer logic then becomes responsible for explicitly committing offsets after a message has been successfully processed.
// Simplified consumer loop structure
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        log.Printf("Message claimed: value = %s, topic = %s, partition = %d, offset = %d", string(message.Value), message.Topic, message.Partition, message.Offset)
        // The core processing logic goes here.
        // This function MUST be idempotent.
        err := c.handler.ProcessEvent(session.Context(), message)
        if err == nil {
            // Mark the message as processed ONLY after successful handling
            session.MarkMessage(message, "")
        }
    }
    return nil
}The session.MarkMessage(message, "") call stages the offset to be committed. The sarama library handles batching these commits for performance. The key is that this only happens after our ProcessEvent function returns without an error.
If the consumer crashes mid-process, the offset is not committed. Upon restart, a new consumer in the group will receive the same message again. We have now guaranteed at-least-once delivery all the way to our application code. The final challenge is to handle that redelivery gracefully.
Building the Idempotent Consumer: The Final Piece
Idempotency means that an operation can be performed multiple times with the same input, and the resulting state of the system will be the same as if it were performed only once. This is the core of our solution to handle message redeliveries.
We will implement this using a database-backed idempotency check, which is robust and works for any kind of business logic.
Idempotency Check with a `processed_events` Table
Within the PaymentService, we create a table to track the IDs of events we have already successfully processed.
-- In the PaymentService's database
CREATE TABLE processed_events (
    event_id UUID PRIMARY KEY,
    consumer_group_id VARCHAR(255) NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- A composite index can be useful for cleanup jobs
CREATE INDEX idx_processed_events_group_time ON processed_events (consumer_group_id, processed_at);The event_id comes directly from the payload of our outbox event. The consumer's logic now wraps the business operation and the idempotency check in a single database transaction.
Here is a complete, production-grade Go implementation of an event handler in the PaymentService.
package main
import (
	"context"
	"database/sql"
	"encoding/json"
	"log"
	"github.com/IBM/sarama"
	"github.com/google/uuid"
	_ "github.com/jackc/pgx/v4/stdlib"
)
// Represents the structure of the event from Kafka
type OrderCreatedEvent struct {
	EventID     uuid.UUID `json:"event_id"`
	AggregateID uuid.UUID `json:"aggregate_id"`
	Payload     struct {
		CustomerID   uuid.UUID `json:"customer_id"`
		TotalAmount  float64   `json:"total_amount"`
	} `json:"payload"`
}
// PaymentHandler processes events
type PaymentHandler struct {
	db *sql.DB
	// paymentGateway would be a real payment gateway client
	// paymentGateway payment.Gateway
}
func (h *PaymentHandler) ProcessEvent(ctx context.Context, msg *sarama.ConsumerMessage) error {
	var event OrderCreatedEvent
	if err := json.Unmarshal(msg.Value, &event); err != nil {
		log.Printf("ERROR: Could not unmarshal event: %v", err)
		// This could be a poison pill. Decide on a DLQ strategy.
		return nil // Acknowledge to prevent infinite retries
	}
	// 1. Begin a transaction in the consumer's database
	tx, err := h.db.BeginTx(ctx, nil)
	if err != nil {
		log.Printf("ERROR: Could not begin transaction: %v", err)
		return err // Return error to trigger Kafka retry
	}
	defer tx.Rollback() // Ensure rollback on any error path
	// 2. The Idempotency Check
	// Try to insert the event ID. If it already exists, we get a constraint violation.
	_, err = tx.ExecContext(ctx, 
        "INSERT INTO processed_events (event_id, consumer_group_id) VALUES ($1, $2)", 
        event.EventID, "payment-service-group")
	if err != nil {
        // A better check would be to inspect the error type for a unique_violation
        // For pgx, this would be: if pgerr, ok := err.(*pgconn.PgError); ok && pgerr.Code == "23505"
		log.Printf("INFO: Event %s already processed. Skipping.", event.EventID)
		// This is NOT an error. It's a successful handling of a duplicate.
		// We don't need to commit the tx, just return nil to ack the Kafka message.
		return nil 
	}
	// 3. Perform the Business Logic (within the same transaction)
	log.Printf("INFO: Processing payment for order %s for amount %.2f", event.AggregateID, event.Payload.TotalAmount)
	// In a real system, you would call a payment gateway here.
	// gatewayResponse, err := h.paymentGateway.Charge(event.Payload.CustomerID, event.Payload.TotalAmount)
	// For this example, we'll just log and insert a payment record.
	_, err = tx.ExecContext(ctx, 
        "INSERT INTO payments (payment_id, order_id, amount, status) VALUES ($1, $2, $3, 'SUCCESS')", 
        uuid.New(), event.AggregateID, event.Payload.TotalAmount)
	if err != nil {
		log.Printf("ERROR: Failed to process payment for order %s: %v", event.AggregateID, err)
		return err // Return error to trigger retry
	}
	// 4. Commit the transaction
	if err := tx.Commit(); err != nil {
		log.Printf("ERROR: Could not commit transaction: %v", err)
		return err // Return error to trigger retry
	}
	log.Printf("SUCCESS: Processed event %s for order %s", event.EventID, event.AggregateID)
	return nil // Success! Kafka offset will be committed.
}
The Flow of a Redelivered Message:
event_id: A is delivered.- The consumer starts a transaction.
event_id: A into processed_events.payments table.- It commits the transaction.
event_id: A because its offset was not committed.- The consumer starts a new transaction.
INSERT INTO processed_events (event_id) VALUES ('A').- The database returns a primary key violation error.
nil.- The Kafka client acknowledges the message, committing the offset.
The business operation was performed exactly once. We have achieved our goal.
Advanced Edge Cases and Performance Tuning
This architecture is robust, but senior engineers must consider the edge cases and long-term performance implications.
Handling Poison Pill Messages
A "poison pill" is a message that consistently causes a consumer to fail. This could be due to malformed JSON, a violation of a business rule that wasn't caught upstream, or a persistent external service failure. Without a mitigation strategy, the consumer will get stuck in an infinite retry loop, blocking processing for that entire partition.
The solution is a Dead Letter Queue (DLQ). After a certain number of failed processing attempts, the message is moved to a separate Kafka topic for later analysis.
Debezium can be configured to send messages that fail in the Kafka Connect pipeline to a DLQ:
// In the connector config
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "orders_dlq",
"errors.deadletterqueue.context.headers.enable": "true"For failures within your consumer application, you must implement the retry logic yourself. A common pattern is to use an in-memory map to track failure counts for message offsets. After N failures, you manually publish the message to a DLQ topic (e.g., payment-service.dlq) and then commit the original message's offset to move on.
Performance of the Idempotency Check
The processed_events table will grow indefinitely. A primary key lookup on a UUID is extremely fast, but table bloat can still become an issue for storage and maintenance.
Solutions:
processed_events that are older than your maximum message retention period in Kafka. For example, if you retain messages for 7 days, you can safely delete any processed event records older than 7 days.    DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL '7 days';SETNX (Set if Not Exists) command, which is atomic.    // Using Redis for idempotency check
    wasSet, err := redisClient.SetNX(ctx, event.EventID.String(), 1, 7 * 24 * time.Hour).Result()
    if err != nil {
        return err // Retry on Redis failure
    }
    if !wasSet {
        log.Printf("INFO: Event %s already processed (found in Redis). Skipping.", event.EventID)
        return nil // Duplicate found
    }
    // ... proceed with business logic    Trade-off: Using Redis for idempotency separates it from your business logic's database transaction. This creates a small window for failure. If you successfully SETNX in Redis but then your business logic transaction fails, the event will be marked as processed and will not be retried. This is often an acceptable risk for non-financial operations, but a single relational database transaction provides the strongest consistency guarantee.
Consumer Rebalancing
When a new consumer joins a group or an existing one leaves, Kafka triggers a rebalance, reassigning topic partitions among the active consumers. Our design is inherently safe against rebalancing for two reasons:
This resilience during rebalancing is a key feature of this architectural pattern.
Conclusion: A System-Level Guarantee
Achieving effective exactly-once semantics is not about a single setting or a magic library. It's a deliberate, system-wide architecture built on a chain of well-understood guarantees:
acks=all) to move the event from the outbox table to Kafka without loss.By composing these patterns, we build a system that is resilient to the inevitable failures of a distributed environment—crashes, network partitions, and restarts. We can confidently process each business event exactly one time, ensuring data consistency and correctness across our microservices architecture.