Choreography Sagas: Distributed Consistency with Kafka & Outbox Pattern
The Inescapable Problem: Atomicity in a Distributed World
As architects of distributed systems, we constantly battle the CAP theorem. In the microservices paradigm, we trade the monolithic comfort of ACID transactions for scalability, resilience, and independent deployability. However, this trade-off introduces a formidable challenge: how do we execute a business process that spans multiple services as a single atomic unit?
The classic example is an e-commerce order. The Order Service
creates an order, the Payment Service
processes the payment, and the Inventory Service
reserves the stock. If any step fails, the entire operation must be rolled back. In a monolith, a single database transaction would wrap these operations, ensuring atomicity. In a distributed environment, the canonical anti-pattern is the two-phase commit (2PC) protocol, which introduces synchronous coupling, is a performance bottleneck, and dramatically reduces the availability of the entire system—a single locked service can bring the whole process to a halt.
This is where the Saga pattern enters. A saga is a sequence of local transactions where each transaction updates data within a single service and publishes an event to trigger the next transaction. If a local transaction fails, the saga executes a series of compensating transactions to undo the preceding transactions. This post focuses on the Choreography approach, where services subscribe to events and react accordingly without a central orchestrator. While elegant, this decentralized model presents its own critical implementation challenge: ensuring that a service's state change and the publication of its corresponding event are truly atomic.
The Dual-Write Problem: A Race Condition Waiting to Happen
A naive implementation of an event-driven saga step might look like this:
// DO NOT DO THIS IN PRODUCTION
func (s *OrderService) CreateOrder(ctx context.Context, orderDetails models.Order) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() // Rollback on error
// 1. Save the order to the database
orderID, err := s.repo.SaveOrder(ctx, tx, orderDetails)
if err != nil {
return fmt.Errorf("failed to save order: %w", err)
}
// 2. Publish an 'OrderCreated' event to Kafka
event := events.OrderCreated{OrderID: orderID, ...}
err = s.kafkaProducer.Publish("orders", event)
if err != nil {
// The transaction will be rolled back, but the event might have been sent!
// Or worse, the DB write succeeds but the publish fails.
return fmt.Errorf("failed to publish event: %w", err)
}
// 3. Commit the database transaction
if err := tx.Commit(); err != nil {
// The event was published, but the DB state was not committed!
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
This code is fundamentally broken due to the dual-write problem. There is no way to atomically commit a database transaction and publish a message to a broker like Kafka. Consider the failure modes:
tx.Commit()
fails due to a network issue, deadlock, or constraint violation. However, the OrderCreated
event has already been sent to Kafka. The Payment Service
will now attempt to process a payment for an order that does not exist in the Order Service
's database, leading to inconsistent state.s.kafkaProducer.Publish()
call fails. The code returns an error, and the defer tx.Rollback()
is called. The order is never persisted, and no event is sent. This is the "least bad" failure, but the business process still fails.tx.Commit()
succeeds, but the process crashes before the Kafka publish call can even be made. The order is now in a PENDING
state in the database forever, with no event to trigger the next step. The system is in an inconsistent state.The Transactional Outbox Pattern: True Atomicity
The solution is the Transactional Outbox pattern. The core idea is to persist the event to be published within the same database and same local transaction as the business state change. This leverages the atomicity of the local database to guarantee that the state change and the intent to publish an event are a single, inseparable unit.
1. The Outbox Table Schema
First, we create an outbox
table in the same database as our business tables (e.g., the orders
table).
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id VARCHAR(255) NOT NULL, -- ID of the entity that was changed (e.g., order_id)
aggregate_type VARCHAR(255) NOT NULL, -- Type of the entity (e.g., 'order')
topic VARCHAR(255) NOT NULL, -- The Kafka topic to publish to
payload JSONB NOT NULL, -- The event payload
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- An index to help the message relay find messages efficiently
CREATE INDEX idx_outbox_created_at ON outbox(created_at);
2. The Atomic Write Implementation
Now, we refactor our CreateOrder
function to use this table. The operation becomes a single atomic transaction.
package orderservice
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/google/uuid"
)
// Event represents the structure of our outbox message payload
type Event struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
Total float64 `json:"total"`
}
// OrderService handles order creation logic
type OrderService struct {
db *sql.DB
}
func (s *OrderService) CreateOrder(ctx context.Context, customerID string, total float64) (string, error) {
// Start a single database transaction
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return "", fmt.Errorf("failed to begin transaction: %w", err)
}
// Ensure rollback on any error path
defer tx.Rollback()
// 1. Insert the business entity (the order)
orderID := uuid.New().String()
_, err = tx.ExecContext(ctx,
"INSERT INTO orders (id, customer_id, total, status) VALUES ($1, $2, $3, 'PENDING')",
orderID, customerID, total)
if err != nil {
return "", fmt.Errorf("failed to insert order: %w", err)
}
// 2. Create the event payload
eventPayload := Event{
OrderID: orderID,
CustomerID: customerID,
Total: total,
}
payloadBytes, err := json.Marshal(eventPayload)
if err != nil {
return "", fmt.Errorf("failed to marshal event payload: %w", err)
}
// 3. Insert the event into the outbox table WITHIN THE SAME TRANSACTION
_, err = tx.ExecContext(ctx,
"INSERT INTO outbox (aggregate_id, aggregate_type, topic, payload) VALUES ($1, 'order', 'order_created', $2)",
orderID, payloadBytes)
if err != nil {
return "", fmt.Errorf("failed to insert into outbox: %w", err)
}
// 4. Commit the transaction
if err := tx.Commit(); err != nil {
return "", fmt.Errorf("failed to commit transaction: %w", err)
}
return orderID, nil
}
With this change, the operation is now truly atomic. Either both the orders
row and the outbox
row are committed successfully, or the entire transaction is rolled back, and neither is written. The dual-write problem is solved.
The Message Relay: From Database to Message Broker
We've successfully persisted the intent to publish an event. Now we need a separate process, the Message Relay, to read from the outbox
table and reliably publish these events to Kafka. There are two primary patterns for this.
Strategy 1: The Polling Publisher
The simpler approach is a background process that periodically polls the outbox
table for new entries, publishes them to Kafka, and then deletes or marks them as processed.
Challenges with Polling:
* Latency: There's an inherent delay based on the polling interval.
* Database Load: Frequent polling can add unnecessary load to the database.
* Scalability: Running multiple instances of the poller requires a locking mechanism to prevent them from processing the same message twice. This can be implemented with SELECT ... FOR UPDATE SKIP LOCKED
in PostgreSQL.
// Simplified Polling Relay Example
func (r *Relay) pollAndPublish() {
// This should run in a loop with a ticker
tx, err := r.db.BeginTx(context.Background(), nil)
if err != nil { /* handle error */ return }
defer tx.Rollback()
// Use FOR UPDATE SKIP LOCKED for concurrent pollers
rows, err := tx.QueryContext(context.Background(),
`SELECT id, topic, payload FROM outbox ORDER BY created_at ASC LIMIT 10 FOR UPDATE SKIP LOCKED`)
if err != nil { /* handle error */ return }
defer rows.Close()
var idsToDelete []uuid.UUID
for rows.Next() {
var id uuid.UUID
var topic string
var payload []byte
// ... scan row ...
// Publish to Kafka. This must be a blocking, synchronous call with retries.
err := r.kafkaProducer.Publish(topic, payload)
if err != nil {
// Log error and break. The transaction will rollback and we'll retry later.
// Implement a proper backoff strategy here.
return
}
idsToDelete = append(idsToDelete, id)
}
// Delete processed messages from the outbox
if len(idsToDelete) > 0 {
// ... construct and execute DELETE FROM outbox WHERE id IN (...) ...
}
tx.Commit()
}
Strategy 2: Change Data Capture (CDC) with Debezium (Preferred)
A more advanced and powerful approach is to use Change Data Capture (CDC). Instead of polling the table, we tap directly into the database's transaction log (e.g., PostgreSQL's Write-Ahead Log or WAL). This allows us to capture row-level changes in real-time and stream them.
Debezium is a popular open-source platform for CDC. It provides Kafka Connect connectors that monitor database transaction logs, convert changes into events, and publish them to Kafka topics. This approach is highly efficient and offers very low latency.
Architecture:
wal_level = logical
).Debezium Connector Configuration (JSON):
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres_user",
"database.password": "postgres_password",
"database.dbname" : "order_db",
"database.server.name": "orders_db_server",
"table.include.list": "public.outbox",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}_events",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.payload": "payload"
}
}
Dissecting the Debezium Configuration:
table.include.list
: We tell Debezium to only* watch our outbox
table.
* transforms
: We apply Debezium's built-in EventRouter
transform. This is the magic.
* Instead of just dumping the raw outbox
row data to a topic named orders_db_server.public.outbox
, the EventRouter
transform inspects the row's columns and re-routes the event.
* transforms.outbox.route.by.field
: We tell it to use the aggregate_type
column ('order'
) for routing.
* transforms.outbox.route.topic.replacement
: We define a template for the destination topic. ${routedByValue}_events
will become order_events
.
* transforms.outbox.table.field.event.payload
: We specify that the payload
column of the outbox
table should become the entire payload of the new Kafka message.
With this setup, when we insert a row into our outbox
table, Debezium almost instantly reads it from the WAL, transforms it, and publishes the clean event payload to the correct Kafka topic (order_events
). The relay mechanism is completely decoupled from our service's runtime and is managed declaratively through Kafka Connect.
Saga Participants: Building Idempotent Consumers
Now the Payment Service
needs to consume the OrderCreated
event. Kafka provides "at-least-once" delivery semantics. This means a message is guaranteed to be delivered, but under certain failure scenarios (e.g., a consumer crashes after processing a message but before committing its offset), it might be delivered again. Therefore, all saga participants must be idempotent.
An operation is idempotent if calling it multiple times with the same input has the same effect as calling it once. A common way to achieve this is by tracking the IDs of processed messages.
1. Processed Messages Table
In the Payment Service
's database, we create a table to track message consumption.
-- In the payment_db
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
2. Idempotent Consumer Implementation
The consumer logic must wrap its business logic and its message tracking within a single transaction.
package paymentservice
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/google/uuid"
)
// Assumes the incoming event has a unique ID in its headers or payload
type OrderCreatedEvent struct {
EventID string `json:"eventId"` // Let's add a unique ID to our event payload
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
Total float64 `json:"total"`
}
// PaymentService consumes events and processes payments
type PaymentService struct {
db *sql.DB
// Kafka producer for publishing 'PaymentProcessed' or 'PaymentFailed' events
kafkaProducer *kafka.Producer
}
func (s *PaymentService) HandleOrderCreated(ctx context.Context, message *kafka.Message) error {
var event OrderCreatedEvent
if err := json.Unmarshal(message.Value, &event); err != nil {
// This could be a "poison pill" - a malformed message. Move to a DLQ.
return fmt.Errorf("failed to unmarshal event: %w", err)
}
eventUUID, err := uuid.Parse(event.EventID)
if err != nil {
return fmt.Errorf("invalid event ID format: %w", err)
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
// 1. Check for idempotency
var count int
err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM processed_events WHERE event_id = $1", eventUUID).Scan(&count)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to check for existing event: %w", err)
}
if count > 0 {
// Already processed. Acknowledge message and exit gracefully.
fmt.Printf("Event %s already processed, skipping.\n", event.EventID)
return nil // Not an error
}
// 2. Perform business logic (process payment)
paymentSuccess, failureReason := processPayment(event.CustomerID, event.Total)
// 3. Insert payment record and outbox event for the next step
var nextEventPayloadBytes []byte
var nextTopic string
if paymentSuccess {
_, err = tx.ExecContext(ctx, "INSERT INTO payments (order_id, amount, status) VALUES ($1, $2, 'SUCCESS')", event.OrderID, event.Total)
// ... create PaymentProcessed event payload and marshal it ...
nextTopic = "payment_processed"
} else {
_, err = tx.ExecContext(ctx, "INSERT INTO payments (order_id, amount, status, reason) VALUES ($1, $2, 'FAILED', $3)", event.OrderID, event.Total, failureReason)
// ... create PaymentFailed event payload and marshal it ...
nextTopic = "payment_failed"
}
if err != nil {
return fmt.Errorf("failed to insert payment record: %w", err)
}
// Insert into this service's OWN outbox table
_, err = tx.ExecContext(ctx,
"INSERT INTO outbox (aggregate_id, aggregate_type, topic, payload) VALUES ($1, 'payment', $2, $3)",
event.OrderID, nextTopic, nextEventPayloadBytes)
if err != nil {
return fmt.Errorf("failed to insert payment outbox event: %w", err)
}
// 4. Mark event as processed
_, err = tx.ExecContext(ctx, "INSERT INTO processed_events (event_id) VALUES ($1)", eventUUID)
if err != nil {
return fmt.Errorf("failed to mark event as processed: %w", err)
}
// 5. Commit the entire transaction
return tx.Commit()
}
Failure and Compensation: Rolling Back the Saga
What happens if the Payment Service
publishes a PaymentFailed
event? The saga must roll back. The Order Service
needs to consume this event and execute a compensating transaction.
Compensating Transaction Logic in Order Service:
payment_failed
topic: The Order Service
needs a consumer group for this topic.PENDING
to CANCELLED
.OrderCancelled
for notifications).// In Order Service
func (s *OrderService) HandlePaymentFailed(ctx context.Context, message *kafka.Message) error {
// ... unmarshal event, get orderID ...
// ... begin transaction ...
// ... check idempotency key in processed_events table ...
// Compensating logic
result, err := tx.ExecContext(ctx,
"UPDATE orders SET status = 'CANCELLED' WHERE id = $1 AND status = 'PENDING'",
orderID)
if err != nil {
return fmt.Errorf("failed to update order status: %w", err)
}
// Check that a row was actually updated to prevent race conditions
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
// Order was not in PENDING state, maybe already cancelled? Log and proceed.
fmt.Printf("Order %s was not in PENDING state, compensation skipped.\n", orderID)
}
// ... mark event as processed in this service's processed_events table ...
// ... commit transaction ...
return nil
}
Advanced Production Considerations
* Outbox Table Maintenance: The outbox
table will grow indefinitely. A cleanup job must run periodically to delete old, processed records. With the Debezium approach, once Debezium has read the WAL entry, the table row is no longer needed by the relay. You can have a simple background job that deletes rows older than a certain threshold (e.g., DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '7 days'
).
* Message Schema and Evolution: Using raw JSON is brittle. In production, you should use a schema registry like Confluent Schema Registry with Avro or Protobuf. This enforces contracts between services and allows for safe schema evolution.
* Monitoring and Alerting: Critical metrics to monitor are:
* Debezium Lag: How far behind the database's current WAL position is the connector? High lag indicates a bottleneck in the relay.
* Kafka Consumer Lag: How far behind the topic's latest message is a consumer group? High lag indicates a slow or failing consumer.
* Outbox Table Size: A rapidly growing outbox table indicates the relay is failing or can't keep up.
* Dead Letter Queue (DLQ) Depth: Configure your consumers to send un-processable messages ("poison pills") to a DLQ after a few failed retries. An alert on a non-zero DLQ depth is essential.
Message Ordering: Kafka guarantees ordering within a partition*. By default, messages are partitioned by key. By using the aggregate_id
(e.g., order_id
) as the Kafka message key, you ensure that all events related to a specific order go to the same partition and are processed in the order they were produced.
Conclusion
The Choreography Saga pattern, when combined with the Transactional Outbox and a CDC-based relay like Debezium, provides a robust, scalable, and resilient solution for managing distributed transactions. It achieves atomicity at the service level, guarantees at-least-once event delivery, and promotes loose coupling between services.
However, this power comes at the cost of complexity. It requires careful implementation of idempotent consumers, designing compensating transactions for all failure paths, and robust monitoring of the entire event pipeline. It is not a pattern to be used for every multi-service interaction, but for core business processes where eventual consistency is acceptable and the atomicity of the entire workflow is non-negotiable, it is an indispensable tool in the senior engineer's arsenal.