Achieving Exactly-Once Semantics with the Transactional Outbox Pattern

17 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Dual-Write Fallacy: A Production Nightmare

In event-driven architectures, a common requirement is to persist a state change to a database and subsequently publish an event notifying other services of that change. The naive approach, often called a "dual-write," involves executing these two distinct operations sequentially within the same block of application code.

Consider this Go code for creating a user order:

go
// ANTI-PATTERN: DO NOT USE IN PRODUCTION
func (s *OrderService) CreateOrder(ctx context.Context, orderDetails models.Order) (*models.Order, error) {
    // Begin a database transaction
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return nil, fmt.Errorf("failed to begin transaction: %w", err)
    }
    defer tx.Rollback() // Rollback on error

    // 1. Write to the database
    createdOrder, err := s.orderRepo.Create(ctx, tx, orderDetails)
    if err != nil {
        return nil, fmt.Errorf("failed to create order in db: %w", err)
    }

    // 2. Publish event to Kafka
    event := events.OrderCreated{OrderID: createdOrder.ID, UserID: createdOrder.UserID, Total: createdOrder.Total}
    if err := s.eventPublisher.Publish(ctx, "orders.created", event); err != nil {
        // CRITICAL FAILURE POINT: DB commit will be rolled back, but what if the publish call times out?
        // Or what if the publish succeeds but the network fails before we get an ACK?
        return nil, fmt.Errorf("failed to publish event: %w", err)
    }

    // Commit the database transaction
    if err := tx.Commit(); err != nil {
        // ANOTHER CRITICAL FAILURE POINT: The event was published, but the DB state is lost!
        return nil, fmt.Errorf("failed to commit transaction: %w", err)
    }

    return createdOrder, nil
}

This code is fundamentally broken for any distributed system. It presents two primary failure modes:

  • DB Commit Fails After Successful Publish: The Kafka message is sent, and downstream consumers react to the OrderCreated event. However, the tx.Commit() fails. The database transaction is rolled back, and the order record is never persisted. The system is now in an inconsistent state: an order exists in the event stream but not in the source-of-truth database.
  • Publish Fails After DB Commit: This is even more subtle. If the tx.Commit() is placed before the s.eventPublisher.Publish(), a crash or network failure after the commit but before the publish results in a persisted order without a corresponding event. Downstream services are never notified, leading to silent data inconsistencies.
  • Two-phase commit (2PC) protocols are a theoretical solution but are rarely practical in modern microservice architectures due to their complexity, performance overhead, and requirement for all participating systems (the database and the message broker) to support the protocol.

    The only robust solution is to leverage the atomicity of a local database transaction. This is the core principle of the Transactional Outbox pattern.

    The Transactional Outbox Pattern: An Architectural Deep Dive

    The pattern reframes the problem: instead of trying to atomically perform two distinct network operations, we perform a single atomic write to the local database. This write includes both the business state change and the intent to publish an event.

    Architecture Components:

  • Application Database: The primary data store for the service (e.g., PostgreSQL).
  • outbox_events Table: A dedicated table within the same database schema as the business tables. It stores the events that need to be published.
  • Service Logic: The application code that performs business operations.
  • Message Relay: An independent process or component responsible for reading events from the outbox_events table and reliably publishing them to the message broker (Kafka).
  • The Atomic Operation:

    The critical insight is that writing to the business table (e.g., orders) and inserting a record into the outbox_events table can be wrapped in a single database transaction. This operation is guaranteed to be atomic by the ACID properties of the database.

    mermaid
    sequenceDiagram
        participant Client
        participant OrderService
        participant PostgreSQL DB
    
        Client->>OrderService: POST /orders
        OrderService->>PostgreSQL DB: BEGIN TRANSACTION
        OrderService->>PostgreSQL DB: INSERT INTO orders (...)
        OrderService->>PostgreSQL DB: INSERT INTO outbox_events (...)
        OrderService->>PostgreSQL DB: COMMIT
        Note right of PostgreSQL DB: Both inserts succeed or fail together.
        PostgreSQL DB-->>OrderService: Commit OK
        OrderService-->>Client: 201 Created

    Now, the system's state is always consistent. If the transaction commits, both the order and the event-to-be-published are durably stored. If it rolls back, neither is.

    Production-Grade Implementation with Debezium (CDC)

    While the concept is simple, the implementation of the Message Relay is where complexity lies. A naive polling mechanism that repeatedly queries SELECT * FROM outbox_events WHERE processed = false puts unnecessary load on the primary database and introduces latency.

    A superior, production-grade approach is to use Change Data Capture (CDC). We will use Debezium, a distributed platform for CDC built on top of Apache Kafka Connect.

    Debezium tails the database's transaction log (the Write-Ahead Log or WAL in PostgreSQL), capturing row-level changes in near real-time and publishing them as events to Kafka topics. This is highly efficient and avoids any polling queries against your application tables.

    1. Database Schema (PostgreSQL)

    First, define the outbox_events table.

    sql
    -- Enable logical replication on the database if not already enabled.
    -- This might require a restart and changes to postgresql.conf:
    -- wal_level = logical
    
    CREATE TABLE outbox_events (
        id UUID PRIMARY KEY,
        aggregate_type VARCHAR(255) NOT NULL, -- e.g., 'Order'
        aggregate_id VARCHAR(255) NOT NULL, -- The ID of the entity that was changed
        event_type VARCHAR(255) NOT NULL,   -- e.g., 'OrderCreated'
        payload JSONB NOT NULL,             -- The event payload
        created_at TIMESTAMPTZ DEFAULT NOW()
    );
    
    -- An index to help with potential manual queries or cleanup jobs.
    CREATE INDEX idx_outbox_created_at ON outbox_events(created_at);

    2. Service Logic (Go)

    Now, refactor the CreateOrder service to use the outbox table within a single transaction.

    go
    package main
    
    import (
    	"context"
    	"database/sql"
    	"encoding/json"
    	"fmt"
    
    	"github.com/google/uuid"
        // ... other imports
    )
    
    // Event to be stored in the outbox
    type OutboxEvent struct {
    	ID            uuid.UUID
    	AggregateType string
    	AggregateID   string
    	EventType     string
    	Payload       json.RawMessage
    }
    
    // OrderService with outbox logic
    func (s *OrderService) CreateOrder(ctx context.Context, orderDetails models.Order) (*models.Order, error) {
        tx, err := s.db.BeginTx(ctx, nil)
        if err != nil {
            return nil, fmt.Errorf("failed to begin transaction: %w", err)
        }
        defer tx.Rollback()
    
        // 1. Create the order in the 'orders' table
        createdOrder, err := s.orderRepo.Create(ctx, tx, orderDetails)
        if err != nil {
            return nil, fmt.Errorf("failed to create order in db: %w", err)
        }
    
        // 2. Create the event payload
        eventPayload, err := json.Marshal(events.OrderCreated{
            OrderID: createdOrder.ID,
            UserID:  createdOrder.UserID,
            Total:   createdOrder.Total,
        })
        if err != nil {
            return nil, fmt.Errorf("failed to marshal event payload: %w", err)
        }
    
        // 3. Create the outbox event
        outboxEvent := OutboxEvent{
            ID:            uuid.New(),
            AggregateType: "Order",
            AggregateID:   createdOrder.ID.String(),
            EventType:     "OrderCreated",
            Payload:       eventPayload,
        }
    
        // 4. Insert the outbox event into the 'outbox_events' table
        if err := s.outboxRepo.Create(ctx, tx, outboxEvent); err != nil {
            return nil, fmt.Errorf("failed to create outbox event: %w", err)
        }
    
        // 5. Commit the single transaction
        if err := tx.Commit(); err != nil {
            return nil, fmt.Errorf("failed to commit transaction: %w", err)
        }
    
        return createdOrder, nil
    }
    
    // outboxRepo implementation
    func (r *PostgresOutboxRepo) Create(ctx context.Context, tx *sql.Tx, event OutboxEvent) error {
        query := `INSERT INTO outbox_events (id, aggregate_type, aggregate_id, event_type, payload)
                   VALUES ($1, $2, $3, $4, $5)`
        _, err := tx.ExecContext(ctx, query, event.ID, event.AggregateType, event.AggregateID, event.EventType, event.Payload)
        return err
    }

    This code is now resilient. The atomicity is guaranteed by PostgreSQL.

    3. Setting up the Message Relay (Debezium)

    We'll use a docker-compose.yml to orchestrate our stack: PostgreSQL, Kafka, Zookeeper, and Kafka Connect with the Debezium connector.

    yaml
    version: '3.8'
    services:
      postgres:
        image: debezium/postgres:13
        ports:
          - "5432:5432"
        environment:
          - POSTGRES_USER=user
          - POSTGRES_PASSWORD=password
          - POSTGRES_DB=orders_db
        volumes:
          - ./pg_data:/var/lib/postgresql/data
    
      zookeeper:
        image: confluentinc/cp-zookeeper:7.0.1
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
    
      kafka:
        image: confluentinc/cp-kafka:7.0.1
        depends_on: [zookeeper]
        ports:
          - "9092:9092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    
      connect:
        image: debezium/connect:1.9
        depends_on: [kafka, postgres]
        ports: ["8083:8083"]
        environment:
          BOOTSTRAP_SERVERS: kafka:29092
          GROUP_ID: 1
          CONFIG_STORAGE_TOPIC: my_connect_configs
          OFFSET_STORAGE_TOPIC: my_connect_offsets
          STATUS_STORAGE_TOPIC: my_connect_statuses

    Once the stack is running (docker-compose up -d), we configure the Debezium PostgreSQL connector to watch our outbox_events table.

    Send this JSON payload to the Kafka Connect REST API at http://localhost:8083/connectors:

    json
    {
      "name": "outbox-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "user",
        "database.password": "password",
        "database.dbname": "orders_db",
        "database.server.name": "pg-orders-server",
        "table.include.list": "public.outbox_events",
        "plugin.name": "pgoutput",
        "tombstones.on.delete": "false",
    
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.by.field": "aggregate_type",
        "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
        "transforms.outbox.table.field.event.key": "aggregate_id"
      }
    }

    Dissecting the Debezium Configuration:

    * table.include.list: Tells Debezium to only watch the outbox_events table.

    * tombstones.on.delete: We set this to false because we will manually prune the outbox table later. We don't want Debezium to create tombstone records in Kafka when we clean up.

    * transforms: This is the most powerful part. We use Debezium's built-in EventRouter Single Message Transform (SMT).

    * route.by.field: It inspects the aggregate_type column ('Order') in the outbox event.

    * route.topic.replacement: It dynamically creates the destination topic name. In our case, an event with aggregate_type: 'Order' will be routed to a topic named Order.events.

    * table.field.event.key: It uses the aggregate_id column as the Kafka message key. This is critical for ordering, as it ensures all events for the same order land on the same Kafka partition.

    Now, when our Go service inserts a record into outbox_events, Debezium will instantly capture it, transform it, and publish it to the Order.events Kafka topic with the correct key and payload.

    Ensuring Idempotency on the Consumer Side

    We have achieved at-least-once delivery from the database to Kafka. The relay (Debezium) is resilient. If it crashes, it will resume from its last recorded offset in the WAL. It might re-publish a message if it crashed after publishing but before committing its offset. This means our downstream consumers must be idempotent.

    An idempotent operation is one that can be applied multiple times without changing the result beyond the initial application. Simply retrying a CreateInvoice operation will result in duplicate invoices.

    We solve this by tracking processed event IDs on the consumer side.

    Idempotent Consumer Implementation (Go)

    Let's build a consumer for the Order.events topic that needs to create an invoice.

    1. Consumer Database Schema:

    The invoicing service needs its own table to track which events it has already processed.

    sql
    CREATE TABLE processed_events (
        event_id UUID PRIMARY KEY,
        processed_at TIMESTAMPTZ DEFAULT NOW()
    );

    2. Consumer Logic:

    The consumer logic will wrap the business operation and the recording of the event ID in a single local transaction.

    go
    // InvoiceService consumer logic
    func (s *InvoiceService) HandleOrderCreatedEvent(ctx context.Context, event events.OrderCreated) error {
        // The event ID comes from the 'id' field of our outbox_events table,
        // which Debezium propagates in the message headers or payload.
        // We assume we have extracted it into event.EventID (type uuid.UUID).
    
        tx, err := s.db.BeginTx(ctx, nil)
        if err != nil {
            return fmt.Errorf("failed to begin transaction: %w", err)
        }
        defer tx.Rollback()
    
        // 1. Check for duplicate event
        var exists bool
        err = tx.QueryRowContext(ctx, "SELECT EXISTS(SELECT 1 FROM processed_events WHERE event_id = $1)", event.EventID).Scan(&exists)
        if err != nil {
            return fmt.Errorf("failed to check for processed event: %w", err)
        }
        if exists {
            log.Printf("Event %s already processed, skipping.", event.EventID)
            // Not an error, we just successfully handled a duplicate.
            return tx.Commit() // Commit to end the transaction
        }
    
        // 2. Perform business logic (e.g., create an invoice)
        invoice := models.Invoice{OrderID: event.OrderID, Amount: event.Total}
        if err := s.invoiceRepo.Create(ctx, tx, invoice); err != nil {
            return fmt.Errorf("failed to create invoice: %w", err)
        }
    
        // 3. Record the event as processed
        _, err = tx.ExecContext(ctx, "INSERT INTO processed_events (event_id) VALUES ($1)", event.EventID)
        if err != nil {
            // Check for unique constraint violation in case of a race condition
            if pqErr, ok := err.(*pq.Error); ok && pqErr.Code == "23505" { // unique_violation
                 log.Printf("Race condition: Event %s processed by another instance. Skipping.", event.EventID)
                 return tx.Commit()
            }
            return fmt.Errorf("failed to record processed event: %w", err)
        }
    
        // 4. Commit the transaction
        return tx.Commit()
    }

    With this pattern, if the same OrderCreated event is delivered twice, the first SELECT EXISTS check will return true on the second attempt, the logic will be skipped, and the operation remains idempotent. The check for the unique constraint violation handles the rare race condition where two consumer instances pick up the same message and check for existence before either has committed.

    This combination of the Transactional Outbox on the producer side and an idempotent consumer provides a robust, end-to-end, exactly-once processing semantic.

    Advanced Considerations and Edge Cases

    Performance and Scalability of the Outbox Table

    Your outbox_events table will grow indefinitely if not managed. This is a critical operational concern.

  • Pruning/Cleanup: A background job must periodically delete old, processed events. With Debezium, there is no processed flag, so you can simply delete records older than a certain threshold (e.g., 7 days). This assumes events are processed in a timely manner.
  • sql
        DELETE FROM outbox_events WHERE created_at < NOW() - INTERVAL '7 days';

    This DELETE can cause locking and performance issues on a very high-throughput table. It's often better to use table partitioning.

  • Partitioning: For extreme-scale systems, partition the outbox_events table in PostgreSQL by date range (e.g., daily or weekly partitions). Dropping an old partition is a metadata-only operation and is vastly more performant than a large-scale DELETE.
  • Polling Relay vs. CDC Relay: A Deeper Look

    While we focused on CDC, it's worth understanding the trade-offs with a polling-based relay.

    Polling Relay:

    * Implementation: A background process runs SELECT ... FOR UPDATE SKIP LOCKED to grab a batch of unprocessed events, publishes them, and then runs an UPDATE to mark them as processed.

    * Pros: Simpler infrastructure; no Debezium/Kafka Connect needed.

    * Cons:

    * DB Load: Constant polling queries hit your primary production database.

    * Latency: Events are only picked up at the polling interval.

    * Complexity: Managing the processed flag and handling SKIP LOCKED correctly adds application-level complexity.

    CDC Relay (Debezium):

    * Pros:

    * Low DB Load: Reads from the transaction log, not the table itself.

    * Near Real-Time: Latency is typically in the milliseconds.

    * Decoupled: The application code is simpler; it just inserts into the outbox. The relay logic is fully externalized to a configured Debezium connector.

    * Cons:

    * Infrastructure Complexity: Requires running and managing a Kafka Connect cluster.

    * Operational Overhead: Monitoring Debezium, managing connectors, and understanding WAL behavior becomes a required operational skill.

    For any serious, high-throughput system, the benefits of CDC far outweigh the operational complexity.

    Kafka Transactions vs. Transactional Outbox

    A common point of confusion is Kafka's own support for transactions (producer.beginTransaction(), producer.sendOffsetsToTransaction(), producer.commitTransaction()). Kafka's Exactly-Once Semantics (EOS) solve the problem of atomically consuming from a topic, processing, and producing to another topic (a "read-process-write" pattern common in stream processing).

    However, Kafka EOS does not solve the initial dual-write problem. It cannot create a transaction that spans your application's database and the Kafka broker. The Transactional Outbox pattern is specifically designed to solve this initial database -> Kafka atomic write. The two patterns are complementary, not mutually exclusive, and solve different problems in the end-to-end data pipeline.

    Conclusion

    The dual-write anti-pattern is a ticking time bomb in any distributed system. By embracing the Transactional Outbox pattern, you leverage the battle-tested ACID guarantees of your local RDBMS to ensure data consistency between your service's state and the events it emits. While the polling approach is a viable starting point, a CDC-based implementation using Debezium provides a scalable, low-latency, and robust solution for production environments.

    By combining this producer-side pattern with idempotent consumers that track processed event IDs, you can achieve true end-to-end exactly-once processing semantics. This architecture is not simple, but it is a foundational pattern for building resilient, data-consistent, and scalable event-driven microservices.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles