Choreography Sagas: Distributed Consistency with Kafka & Outbox Pattern

17 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inescapable Problem: Atomicity in a Distributed World

As architects of distributed systems, we constantly battle the CAP theorem. In the microservices paradigm, we trade the monolithic comfort of ACID transactions for scalability, resilience, and independent deployability. However, this trade-off introduces a formidable challenge: how do we execute a business process that spans multiple services as a single atomic unit?

The classic example is an e-commerce order. The Order Service creates an order, the Payment Service processes the payment, and the Inventory Service reserves the stock. If any step fails, the entire operation must be rolled back. In a monolith, a single database transaction would wrap these operations, ensuring atomicity. In a distributed environment, the canonical anti-pattern is the two-phase commit (2PC) protocol, which introduces synchronous coupling, is a performance bottleneck, and dramatically reduces the availability of the entire system—a single locked service can bring the whole process to a halt.

This is where the Saga pattern enters. A saga is a sequence of local transactions where each transaction updates data within a single service and publishes an event to trigger the next transaction. If a local transaction fails, the saga executes a series of compensating transactions to undo the preceding transactions. This post focuses on the Choreography approach, where services subscribe to events and react accordingly without a central orchestrator. While elegant, this decentralized model presents its own critical implementation challenge: ensuring that a service's state change and the publication of its corresponding event are truly atomic.

The Dual-Write Problem: A Race Condition Waiting to Happen

A naive implementation of an event-driven saga step might look like this:

go
// DO NOT DO THIS IN PRODUCTION
func (s *OrderService) CreateOrder(ctx context.Context, orderDetails models.Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to begin transaction: %w", err)
    }
    defer tx.Rollback() // Rollback on error

    // 1. Save the order to the database
    orderID, err := s.repo.SaveOrder(ctx, tx, orderDetails)
    if err != nil {
        return fmt.Errorf("failed to save order: %w", err)
    }

    // 2. Publish an 'OrderCreated' event to Kafka
    event := events.OrderCreated{OrderID: orderID, ...}
    err = s.kafkaProducer.Publish("orders", event)
    if err != nil {
        // The transaction will be rolled back, but the event might have been sent!
        // Or worse, the DB write succeeds but the publish fails.
        return fmt.Errorf("failed to publish event: %w", err)
    }

    // 3. Commit the database transaction
    if err := tx.Commit(); err != nil {
        // The event was published, but the DB state was not committed!
        return fmt.Errorf("failed to commit transaction: %w", err)
    }

    return nil
}

This code is fundamentally broken due to the dual-write problem. There is no way to atomically commit a database transaction and publish a message to a broker like Kafka. Consider the failure modes:

  • DB Commit Fails, Publish Succeeds: The tx.Commit() fails due to a network issue, deadlock, or constraint violation. However, the OrderCreated event has already been sent to Kafka. The Payment Service will now attempt to process a payment for an order that does not exist in the Order Service's database, leading to inconsistent state.
  • Publish Fails, DB Write Succeeds (before commit): The s.kafkaProducer.Publish() call fails. The code returns an error, and the defer tx.Rollback() is called. The order is never persisted, and no event is sent. This is the "least bad" failure, but the business process still fails.
  • Service Crashes After DB Commit, Before Publish: This is the most insidious failure. The tx.Commit() succeeds, but the process crashes before the Kafka publish call can even be made. The order is now in a PENDING state in the database forever, with no event to trigger the next step. The system is in an inconsistent state.
  • The Transactional Outbox Pattern: True Atomicity

    The solution is the Transactional Outbox pattern. The core idea is to persist the event to be published within the same database and same local transaction as the business state change. This leverages the atomicity of the local database to guarantee that the state change and the intent to publish an event are a single, inseparable unit.

    1. The Outbox Table Schema

    First, we create an outbox table in the same database as our business tables (e.g., the orders table).

    sql
    CREATE TABLE outbox (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        aggregate_id VARCHAR(255) NOT NULL, -- ID of the entity that was changed (e.g., order_id)
        aggregate_type VARCHAR(255) NOT NULL, -- Type of the entity (e.g., 'order')
        topic VARCHAR(255) NOT NULL, -- The Kafka topic to publish to
        payload JSONB NOT NULL, -- The event payload
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    
    -- An index to help the message relay find messages efficiently
    CREATE INDEX idx_outbox_created_at ON outbox(created_at);

    2. The Atomic Write Implementation

    Now, we refactor our CreateOrder function to use this table. The operation becomes a single atomic transaction.

    go
    package orderservice
    
    import (
    	"context"
    	"database/sql"
    	"encoding/json"
    	"fmt"
    
    	"github.com/google/uuid"
    )
    
    // Event represents the structure of our outbox message payload
    type Event struct {
    	OrderID    string  `json:"orderId"`
    	CustomerID string  `json:"customerId"`
    	Total      float64 `json:"total"`
    }
    
    // OrderService handles order creation logic
    type OrderService struct {
    	db *sql.DB
    }
    
    func (s *OrderService) CreateOrder(ctx context.Context, customerID string, total float64) (string, error) {
        // Start a single database transaction
    	tx, err := s.db.BeginTx(ctx, nil)
    	if err != nil {
    		return "", fmt.Errorf("failed to begin transaction: %w", err)
    	}
    	// Ensure rollback on any error path
    	defer tx.Rollback()
    
    	// 1. Insert the business entity (the order)
    	orderID := uuid.New().String()
    	_, err = tx.ExecContext(ctx, 
            "INSERT INTO orders (id, customer_id, total, status) VALUES ($1, $2, $3, 'PENDING')", 
            orderID, customerID, total)
    	if err != nil {
    		return "", fmt.Errorf("failed to insert order: %w", err)
    	}
    
    	// 2. Create the event payload
    	eventPayload := Event{
    		OrderID:    orderID,
    		CustomerID: customerID,
    		Total:      total,
    	}
    	payloadBytes, err := json.Marshal(eventPayload)
    	if err != nil {
    		return "", fmt.Errorf("failed to marshal event payload: %w", err)
    	}
    
    	// 3. Insert the event into the outbox table WITHIN THE SAME TRANSACTION
    	_, err = tx.ExecContext(ctx, 
            "INSERT INTO outbox (aggregate_id, aggregate_type, topic, payload) VALUES ($1, 'order', 'order_created', $2)", 
            orderID, payloadBytes)
    	if err != nil {
    		return "", fmt.Errorf("failed to insert into outbox: %w", err)
    	}
    
    	// 4. Commit the transaction
    	if err := tx.Commit(); err != nil {
    		return "", fmt.Errorf("failed to commit transaction: %w", err)
    	}
    
    	return orderID, nil
    }

    With this change, the operation is now truly atomic. Either both the orders row and the outbox row are committed successfully, or the entire transaction is rolled back, and neither is written. The dual-write problem is solved.

    The Message Relay: From Database to Message Broker

    We've successfully persisted the intent to publish an event. Now we need a separate process, the Message Relay, to read from the outbox table and reliably publish these events to Kafka. There are two primary patterns for this.

    Strategy 1: The Polling Publisher

    The simpler approach is a background process that periodically polls the outbox table for new entries, publishes them to Kafka, and then deletes or marks them as processed.

    Challenges with Polling:

    * Latency: There's an inherent delay based on the polling interval.

    * Database Load: Frequent polling can add unnecessary load to the database.

    * Scalability: Running multiple instances of the poller requires a locking mechanism to prevent them from processing the same message twice. This can be implemented with SELECT ... FOR UPDATE SKIP LOCKED in PostgreSQL.

    go
    // Simplified Polling Relay Example
    func (r *Relay) pollAndPublish() {
        // This should run in a loop with a ticker
        tx, err := r.db.BeginTx(context.Background(), nil)
        if err != nil { /* handle error */ return }
        defer tx.Rollback()
    
        // Use FOR UPDATE SKIP LOCKED for concurrent pollers
        rows, err := tx.QueryContext(context.Background(), 
            `SELECT id, topic, payload FROM outbox ORDER BY created_at ASC LIMIT 10 FOR UPDATE SKIP LOCKED`)
        if err != nil { /* handle error */ return }
        defer rows.Close()
    
        var idsToDelete []uuid.UUID
        for rows.Next() {
            var id uuid.UUID
            var topic string
            var payload []byte
            // ... scan row ...
    
            // Publish to Kafka. This must be a blocking, synchronous call with retries.
            err := r.kafkaProducer.Publish(topic, payload)
            if err != nil {
                // Log error and break. The transaction will rollback and we'll retry later.
                // Implement a proper backoff strategy here.
                return
            }
            idsToDelete = append(idsToDelete, id)
        }
    
        // Delete processed messages from the outbox
        if len(idsToDelete) > 0 {
            // ... construct and execute DELETE FROM outbox WHERE id IN (...) ...
        }
    
        tx.Commit()
    }

    Strategy 2: Change Data Capture (CDC) with Debezium (Preferred)

    A more advanced and powerful approach is to use Change Data Capture (CDC). Instead of polling the table, we tap directly into the database's transaction log (e.g., PostgreSQL's Write-Ahead Log or WAL). This allows us to capture row-level changes in real-time and stream them.

    Debezium is a popular open-source platform for CDC. It provides Kafka Connect connectors that monitor database transaction logs, convert changes into events, and publish them to Kafka topics. This approach is highly efficient and offers very low latency.

    Architecture:

  • PostgreSQL: Configured for logical replication (wal_level = logical).
  • Kafka & Kafka Connect: The runtime for our connectors.
  • Debezium PostgreSQL Connector: A Kafka Connect plugin that reads from the PostgreSQL WAL.
  • Debezium Connector Configuration (JSON):

    json
    {
        "name": "outbox-connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max": "1",
            "database.hostname": "postgres",
            "database.port": "5432",
            "database.user": "postgres_user",
            "database.password": "postgres_password",
            "database.dbname" : "order_db",
            "database.server.name": "orders_db_server",
            "table.include.list": "public.outbox",
            "tombstones.on.delete": "false",
            "transforms": "outbox",
            "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
            "transforms.outbox.route.by.field": "aggregate_type",
            "transforms.outbox.route.topic.replacement": "${routedByValue}_events",
            "transforms.outbox.table.field.event.key": "aggregate_id",
            "transforms.outbox.table.field.event.payload": "payload"
        }
    }

    Dissecting the Debezium Configuration:

    table.include.list: We tell Debezium to only* watch our outbox table.

    * transforms: We apply Debezium's built-in EventRouter transform. This is the magic.

    * Instead of just dumping the raw outbox row data to a topic named orders_db_server.public.outbox, the EventRouter transform inspects the row's columns and re-routes the event.

    * transforms.outbox.route.by.field: We tell it to use the aggregate_type column ('order') for routing.

    * transforms.outbox.route.topic.replacement: We define a template for the destination topic. ${routedByValue}_events will become order_events.

    * transforms.outbox.table.field.event.payload: We specify that the payload column of the outbox table should become the entire payload of the new Kafka message.

    With this setup, when we insert a row into our outbox table, Debezium almost instantly reads it from the WAL, transforms it, and publishes the clean event payload to the correct Kafka topic (order_events). The relay mechanism is completely decoupled from our service's runtime and is managed declaratively through Kafka Connect.

    Saga Participants: Building Idempotent Consumers

    Now the Payment Service needs to consume the OrderCreated event. Kafka provides "at-least-once" delivery semantics. This means a message is guaranteed to be delivered, but under certain failure scenarios (e.g., a consumer crashes after processing a message but before committing its offset), it might be delivered again. Therefore, all saga participants must be idempotent.

    An operation is idempotent if calling it multiple times with the same input has the same effect as calling it once. A common way to achieve this is by tracking the IDs of processed messages.

    1. Processed Messages Table

    In the Payment Service's database, we create a table to track message consumption.

    sql
    -- In the payment_db
    CREATE TABLE processed_events (
        event_id UUID PRIMARY KEY,
        processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );

    2. Idempotent Consumer Implementation

    The consumer logic must wrap its business logic and its message tracking within a single transaction.

    go
    package paymentservice
    
    import (
    	"context"
    	"database/sql"
    	"encoding/json"
    	"fmt"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"github.com/google/uuid"
    )
    
    // Assumes the incoming event has a unique ID in its headers or payload
    type OrderCreatedEvent struct {
    	EventID    string  `json:"eventId"` // Let's add a unique ID to our event payload
    	OrderID    string  `json:"orderId"`
    	CustomerID string  `json:"customerId"`
    	Total      float64 `json:"total"`
    }
    
    // PaymentService consumes events and processes payments
    type PaymentService struct {
    	db *sql.DB
    	// Kafka producer for publishing 'PaymentProcessed' or 'PaymentFailed' events
    	kafkaProducer *kafka.Producer 
    }
    
    func (s *PaymentService) HandleOrderCreated(ctx context.Context, message *kafka.Message) error {
    	var event OrderCreatedEvent
    	if err := json.Unmarshal(message.Value, &event); err != nil {
    		// This could be a "poison pill" - a malformed message. Move to a DLQ.
    		return fmt.Errorf("failed to unmarshal event: %w", err)
    	}
    
    	eventUUID, err := uuid.Parse(event.EventID)
    	if err != nil {
    		return fmt.Errorf("invalid event ID format: %w", err)
    	}
    
    	tx, err := s.db.BeginTx(ctx, nil)
    	if err != nil {
    		return fmt.Errorf("failed to begin transaction: %w", err)
    	}
    	defer tx.Rollback()
    
    	// 1. Check for idempotency
    	var count int
    	err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM processed_events WHERE event_id = $1", eventUUID).Scan(&count)
    	if err != nil && err != sql.ErrNoRows {
    		return fmt.Errorf("failed to check for existing event: %w", err)
    	}
    	if count > 0 {
    		// Already processed. Acknowledge message and exit gracefully.
    		fmt.Printf("Event %s already processed, skipping.\n", event.EventID)
    		return nil // Not an error
    	}
    
    	// 2. Perform business logic (process payment)
    	paymentSuccess, failureReason := processPayment(event.CustomerID, event.Total)
    
    	// 3. Insert payment record and outbox event for the next step
    	var nextEventPayloadBytes []byte
    	var nextTopic string
    
    	if paymentSuccess {
    		_, err = tx.ExecContext(ctx, "INSERT INTO payments (order_id, amount, status) VALUES ($1, $2, 'SUCCESS')", event.OrderID, event.Total)
    		// ... create PaymentProcessed event payload and marshal it ...
            nextTopic = "payment_processed"
    	} else {
    		_, err = tx.ExecContext(ctx, "INSERT INTO payments (order_id, amount, status, reason) VALUES ($1, $2, 'FAILED', $3)", event.OrderID, event.Total, failureReason)
    		// ... create PaymentFailed event payload and marshal it ...
            nextTopic = "payment_failed"
    	}
    	if err != nil {
    		return fmt.Errorf("failed to insert payment record: %w", err)
    	}
    
        // Insert into this service's OWN outbox table
        _, err = tx.ExecContext(ctx, 
            "INSERT INTO outbox (aggregate_id, aggregate_type, topic, payload) VALUES ($1, 'payment', $2, $3)", 
            event.OrderID, nextTopic, nextEventPayloadBytes)
        if err != nil {
            return fmt.Errorf("failed to insert payment outbox event: %w", err)
        }
    
    	// 4. Mark event as processed
    	_, err = tx.ExecContext(ctx, "INSERT INTO processed_events (event_id) VALUES ($1)", eventUUID)
    	if err != nil {
    		return fmt.Errorf("failed to mark event as processed: %w", err)
    	}
    
    	// 5. Commit the entire transaction
    	return tx.Commit()
    }

    Failure and Compensation: Rolling Back the Saga

    What happens if the Payment Service publishes a PaymentFailed event? The saga must roll back. The Order Service needs to consume this event and execute a compensating transaction.

    Compensating Transaction Logic in Order Service:

  • Consume payment_failed topic: The Order Service needs a consumer group for this topic.
  • Idempotency: The consumer must be idempotent, just like the payment consumer.
  • Business Logic: The logic is to change the order status from PENDING to CANCELLED.
  • Transactionality: This state change must be atomic. Since it's the end of this failure path, it may not need to publish another event, but it could (e.g., OrderCancelled for notifications).
  • go
    // In Order Service
    func (s *OrderService) HandlePaymentFailed(ctx context.Context, message *kafka.Message) error {
        // ... unmarshal event, get orderID ...
        // ... begin transaction ...
        // ... check idempotency key in processed_events table ...
    
        // Compensating logic
        result, err := tx.ExecContext(ctx, 
            "UPDATE orders SET status = 'CANCELLED' WHERE id = $1 AND status = 'PENDING'", 
            orderID)
        if err != nil {
            return fmt.Errorf("failed to update order status: %w", err)
        }
    
        // Check that a row was actually updated to prevent race conditions
        rowsAffected, _ := result.RowsAffected()
        if rowsAffected == 0 {
            // Order was not in PENDING state, maybe already cancelled? Log and proceed.
            fmt.Printf("Order %s was not in PENDING state, compensation skipped.\n", orderID)
        }
    
        // ... mark event as processed in this service's processed_events table ...
        // ... commit transaction ...
        return nil
    }

    Advanced Production Considerations

    * Outbox Table Maintenance: The outbox table will grow indefinitely. A cleanup job must run periodically to delete old, processed records. With the Debezium approach, once Debezium has read the WAL entry, the table row is no longer needed by the relay. You can have a simple background job that deletes rows older than a certain threshold (e.g., DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '7 days').

    * Message Schema and Evolution: Using raw JSON is brittle. In production, you should use a schema registry like Confluent Schema Registry with Avro or Protobuf. This enforces contracts between services and allows for safe schema evolution.

    * Monitoring and Alerting: Critical metrics to monitor are:

    * Debezium Lag: How far behind the database's current WAL position is the connector? High lag indicates a bottleneck in the relay.

    * Kafka Consumer Lag: How far behind the topic's latest message is a consumer group? High lag indicates a slow or failing consumer.

    * Outbox Table Size: A rapidly growing outbox table indicates the relay is failing or can't keep up.

    * Dead Letter Queue (DLQ) Depth: Configure your consumers to send un-processable messages ("poison pills") to a DLQ after a few failed retries. An alert on a non-zero DLQ depth is essential.

    Message Ordering: Kafka guarantees ordering within a partition*. By default, messages are partitioned by key. By using the aggregate_id (e.g., order_id) as the Kafka message key, you ensure that all events related to a specific order go to the same partition and are processed in the order they were produced.

    Conclusion

    The Choreography Saga pattern, when combined with the Transactional Outbox and a CDC-based relay like Debezium, provides a robust, scalable, and resilient solution for managing distributed transactions. It achieves atomicity at the service level, guarantees at-least-once event delivery, and promotes loose coupling between services.

    However, this power comes at the cost of complexity. It requires careful implementation of idempotent consumers, designing compensating transactions for all failure paths, and robust monitoring of the entire event pipeline. It is not a pattern to be used for every multi-service interaction, but for core business processes where eventual consistency is acceptable and the atomicity of the entire workflow is non-negotiable, it is an indispensable tool in the senior engineer's arsenal.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles