Idempotent Sagas: Exactly-Once Processing in Event-Driven Systems
The Inevitable Contradiction of Distributed Systems: Reliability vs. Duplication
In any non-trivial event-driven architecture, the contract with our message broker (be it Kafka, RabbitMQ, or Pulsar) is almost universally at-least-once delivery. This is a pragmatic compromise. Guaranteeing exactly-once delivery at the infrastructure level is a notoriously difficult distributed consensus problem, often leading to unacceptable performance trade-offs. Therefore, the responsibility is shifted from the broker to the consumer.
This shift creates a critical challenge for service authors. If a service consumes an OrderCreated event, debits a customer's wallet, and then fails before acknowledging the message, the broker will redeliver it. The service, upon recovery, will see the same event and, without proper safeguards, debit the customer's wallet a second time. This is the classic failure mode that the Saga pattern aims to manage, but the base pattern itself doesn't solve the duplicate processing problem; it only coordinates the workflow.
The naive solution—checking if the action has already been performed before executing—is fraught with race conditions and non-atomic operations. For instance:
// DO NOT DO THIS - RACE CONDITION
func (s *PaymentService) handleDebitEvent(event OrderCreated) {
// 1. Check if payment exists
exists, _ := s.db.PaymentExistsForOrder(event.OrderID)
if exists {
return // Assume already processed
}
// 2. Create payment
_ = s.db.CreatePayment(event.OrderID, event.Amount)
}
If two instances of this consumer process the same event concurrently (a common scenario during a rolling deploy or a consumer group rebalance), both could execute the check, find no existing payment, and proceed to create one. You have now double-charged the customer.
To build a truly resilient system, we must implement idempotency at the application layer. The consumer must be able to receive the same message multiple times and guarantee the end result is identical to having received it only once. The most robust method for achieving this is the Idempotency Key Pattern within a transactional boundary.
The Idempotency Key Pattern: A Transactional Blueprint
The core principle is to treat the processing of an event and the recording of that processing as a single, atomic operation. We accomplish this by passing a unique identifier—the idempotency key—with each event that initiates a mutable action. The consumer then uses this key to de-duplicate requests.
An idempotency key should be:
Here is the high-level flow for an idempotent consumer:
Idempotency-Key in its metadata/header.- The consumer begins a new database transaction.
idempotency_keys table matching the provided key.* Case A: Key Found. If the key is found, the associated operation has either completed or is in progress. The consumer can immediately return the stored result from that record without re-executing any business logic.
* Case B: Key Not Found. This is the first time we've seen this key. The consumer inserts a new record for the key into the idempotency_keys table with a pending status.
- The consumer proceeds to execute its core business logic (e.g., debiting the wallet).
idempotency_keys table, changing the status to completed and storing the result.- The entire database transaction is committed.
If at any point a failure occurs—the business logic fails, the database connection is lost, the process crashes—the entire transaction is rolled back. The entry in the idempotency_keys table is either never committed or remains in a state that can be safely retried. When the event is redelivered, the process repeats, ensuring the business logic is executed exactly once.
Production-Grade Implementation: Go, Kafka, and PostgreSQL
Let's build a concrete implementation for a payment-service that consumes OrderCreated events. We'll use Go for its concurrency features, Kafka as our message broker, and PostgreSQL for its robust transactional support.
1. Database Schema
First, we need a table to store our idempotency keys. This table is the cornerstone of the pattern.
-- Status of the idempotent operation
CREATE TYPE idempotency_status AS ENUM ('pending', 'completed', 'failed');
-- Table to store idempotency keys and their results
CREATE TABLE idempotency_keys (
-- The key itself, provided by the client/producer
key VARCHAR(255) PRIMARY KEY,
-- The name of the user/actor performing the action, for locking and scoping
-- In our event-driven case, this could be the consumer group ID or a static service name.
actor_id VARCHAR(255) NOT NULL,
-- The current status of the operation
status idempotency_status NOT NULL DEFAULT 'pending',
-- The result of the operation, stored as JSON to be flexible
response_payload JSONB,
-- The HTTP status code or an equivalent result code for the operation
response_code INT,
-- Timestamp for when the operation was locked for processing
locked_at TIMESTAMPTZ,
-- Timestamps for creation and updates
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Crucial index for fast lookups
CREATE INDEX idx_idempotency_keys_key ON idempotency_keys(key);
-- Optional: Index for cleanup jobs
CREATE INDEX idx_idempotency_keys_created_at ON idempotency_keys(created_at);
We also need our business table, for example, payments.
CREATE TABLE payments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
order_id UUID NOT NULL UNIQUE,
amount_cents INT NOT NULL,
status VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
2. The Idempotency Middleware
To keep our business logic clean, we'll encapsulate the idempotency check in a middleware or decorator. This function will wrap our actual event handler.
package main
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
)
// Represents the stored state of an idempotent operation
type IdempotencyKey struct {
Key string `db:"key"`
ActorID string `db:"actor_id"`
Status string `db:"status"`
ResponsePayload json.RawMessage `db:"response_payload"`
ResponseCode int `db:"response_code"`
LockedAt sql.NullTime `db:"locked_at"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
// A generic response structure to store in the idempotency table
type HandlerResponse struct {
StatusCode int
Body []byte
}
// The signature for our actual business logic handlers
type IdempotentHandler func(ctx context.Context, tx *sql.Tx, eventPayload []byte) (*HandlerResponse, error)
// The middleware that wraps the business logic handler
func IdempotencyWrapper(db *sql.DB, handler IdempotentHandler) func(ctx context.Context, idempotencyKey string, eventPayload []byte) (*HandlerResponse, error) {
return func(ctx context.Context, idempotencyKey string, eventPayload []byte) (*HandlerResponse, error) {
// 1. Begin Transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() // Rollback on any error
// 2. Lock and Check for Existing Key. THIS IS THE CRITICAL STEP.
// `FOR UPDATE` acquires a row-level lock, preventing concurrent processors
// from interfering with this specific key.
var keyRecord IdempotencyKey
row := tx.QueryRowContext(ctx, `SELECT key, status, response_payload, response_code FROM idempotency_keys WHERE key = $1 FOR UPDATE`, idempotencyKey)
err = row.Scan(&keyRecord.Key, &keyRecord.Status, &keyRecord.ResponsePayload, &keyRecord.ResponseCode)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("failed to select idempotency key: %w", err)
}
// Case A: Key Found - Operation was already processed or is in progress
if err == nil {
if keyRecord.Status == "completed" {
log.Printf("Request with key %s already completed. Returning stored response.", idempotencyKey)
return &HandlerResponse{
StatusCode: keyRecord.ResponseCode,
Body: keyRecord.ResponsePayload,
}, nil
} else if keyRecord.Status == "pending" {
// This could indicate a crash during a previous attempt.
// We can decide to either fail or proceed, depending on business requirements.
// For this example, we will proceed, as the transaction was rolled back.
log.Printf("Request with key %s found in pending state. Retrying operation.", idempotencyKey)
}
} else { // Case B: Key Not Found - First time seeing this request
log.Printf("First time seeing key %s. Inserting new record.", idempotencyKey)
_, err = tx.ExecContext(ctx,
`INSERT INTO idempotency_keys (key, actor_id, status, locked_at) VALUES ($1, $2, 'pending', NOW())`,
idempotencyKey, "payment-service-v1")
if err != nil {
return nil, fmt.Errorf("failed to insert idempotency key: %w", err)
}
}
// 3. Execute Core Business Logic
response, err := handler(ctx, tx, eventPayload)
if err != nil {
// Business logic failed. We can optionally mark the key as 'failed'
// to prevent retries for non-transient errors.
_, updateErr := tx.ExecContext(ctx, `UPDATE idempotency_keys SET status = 'failed', updated_at = NOW() WHERE key = $1`, idempotencyKey)
if updateErr != nil {
log.Printf("CRITICAL: Failed to mark key %s as failed after business logic error: %v", idempotencyKey, updateErr)
}
return nil, fmt.Errorf("handler logic failed: %w", err)
}
// 4. Persist the successful result
_, err = tx.ExecContext(ctx,
`UPDATE idempotency_keys SET status = 'completed', response_payload = $1, response_code = $2, updated_at = NOW() WHERE key = $3`,
response.Body, response.StatusCode, idempotencyKey)
if err != nil {
return nil, fmt.Errorf("failed to update idempotency key to completed: %w", err)
}
// 5. Commit the entire transaction
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
log.Printf("Successfully processed request with key %s.", idempotencyKey)
return response, nil
}
}
3. The Business Logic Handler
Now our actual handler is simple. It receives the transaction object (*sql.Tx) and operates within it, completely unaware of the idempotency logic.
// Represents the payload of our OrderCreated event
type OrderCreatedEvent struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Amount int `json:"amount_cents"`
}
// processDebit is our core business logic.
func processDebit(ctx context.Context, tx *sql.Tx, eventPayload []byte) (*HandlerResponse, error) {
var event OrderCreatedEvent
if err := json.Unmarshal(eventPayload, &event); err != nil {
return nil, fmt.Errorf("invalid event payload: %w", err)
}
// Business logic: create a payment record.
// In a real system, this would also call out to a payment gateway, etc.
// All database operations MUST use the provided `tx` object.
_, err := tx.ExecContext(ctx,
`INSERT INTO payments (order_id, amount_cents, status) VALUES ($1, $2, 'completed')`,
event.OrderID, event.Amount)
if err != nil {
// Check for unique constraint violation on order_id to be extra safe
// This can happen if the idempotency key logic somehow fails.
return nil, fmt.Errorf("failed to insert payment: %w", err)
}
responseBody, _ := json.Marshal(map[string]string{"status": "success", "order_id": event.OrderID})
return &HandlerResponse{
StatusCode: 200,
Body: responseBody,
}, nil
}
// Main function to simulate a Kafka consumer
func main() {
// Database connection setup
dbURL := "postgres://user:password@localhost:5432/idempotency_db?sslmode=disable"
db, err := sql.Open("postgres", dbURL)
if err != nil {
log.Fatalf("Could not connect to database: %v", err)
}
defer db.Close()
// Wrap our business logic with the idempotency middleware
debitHandler := IdempotencyWrapper(db, processDebit)
// --- Simulate receiving two identical Kafka messages ---
orderID := uuid.New().String()
eventPayload, _ := json.Marshal(OrderCreatedEvent{
OrderID: orderID,
CustomerID: uuid.New().String(),
Amount: 10000, // 100.00
})
// The producer of the event generates the idempotency key
idempotencyKey := uuid.New().String()
log.Println("--- First attempt --- ")
_, err = debitHandler(context.Background(), idempotencyKey, eventPayload)
if err != nil {
log.Fatalf("First attempt failed: %v", err)
}
log.Println("--- Second attempt (duplicate) --- ")
_, err = debitHandler(context.Background(), idempotencyKey, eventPayload)
if err != nil {
log.Fatalf("Second attempt failed: %v", err)
}
}
When you run this, the output will clearly show the first attempt inserting the key and processing the payment, while the second attempt recognizes the key and returns the stored response without executing processDebit again.
Advanced Edge Cases and Performance Considerations
A basic implementation is a good start, but production systems present more challenges.
1. The Criticality of `SELECT ... FOR UPDATE`
The most important and subtle part of the implementation is SELECT ... FOR UPDATE. What does this do? When this query is executed within a transaction, PostgreSQL acquires a write lock on the selected row(s). If a row doesn't exist, it may lock the index range where the row would be inserted (gap-locking, depending on the isolation level).
Consider two concurrent consumers (Pod A and Pod B) processing the exact same event:
SELECT ... FOR UPDATE WHERE key = 'xyz'. The row doesn't exist.SELECT ... FOR UPDATE WHERE key = 'xyz'. FOR UPDATE: Both would get sql.ErrNoRows and proceed to the INSERT statement. The second INSERT would fail due to the primary key constraint, causing a messy race condition that is hard to recover from gracefully.FOR UPDATE: Pod A's query establishes a lock. Pod B's SELECT statement will block and wait. It will not proceed until Pod A's transaction either commits or rolls back. * If Pod A commits, its INSERT will have created the row. When Pod B's SELECT unblocks, it will now find the row and correctly follow the "Key Found" logic path.
* If Pod A rolls back, the lock is released. Pod B's SELECT unblocks, finds no row, and proceeds to attempt the operation itself.
This pessimistic lock serializes access to a given idempotency key, elegantly solving the race condition at the cost of blocking one of the consumers.
2. Idempotency Key TTL and Table Bloat
The idempotency_keys table will grow indefinitely. Storing keys forever is unnecessary, as the window for message redelivery is typically finite (e.g., Kafka's retention period, or a few hours/days for transient failures). You must have a cleanup strategy.
* Periodic Deletion Job: The most common approach. A background job runs periodically (e.g., every night) and deletes keys older than a configured TTL (e.g., 48 hours).
DELETE FROM idempotency_keys WHERE created_at < NOW() - INTERVAL '48 hours';
This is simple and effective. Ensure created_at is indexed to make the DELETE efficient.
* Database-specific TTL: Some databases like MongoDB and DynamoDB have built-in TTL features on documents. While convenient, relying on these can be less portable. For PostgreSQL, extensions like pg_partman can be used to partition the table by date, allowing old partitions to be dropped efficiently instead of performing a large DELETE.
3. Handling Non-Transient Business Logic Failures
What if the business logic fails for a reason that is not transient? For example, an InsufficientFunds error. If we simply roll back the transaction, the message will be redelivered, and we will retry the same failing operation, potentially indefinitely, until it's moved to a dead-letter queue (DLQ).
A better approach is to catch specific, non-retriable errors and mark the idempotency key as failed.
// Inside the IdempotencyWrapper
response, err := handler(ctx, tx, eventPayload)
if err != nil {
var insufficientFundsErr *InsufficientFundsError
if errors.As(err, &insufficientFundsErr) {
// This is a terminal failure. Mark it as such.
_, updateErr := tx.ExecContext(ctx, `UPDATE idempotency_keys SET status = 'failed', response_payload = $1, response_code = 400 WHERE key = $2`, insufficientFundsErr.Error(), idempotencyKey)
// ... handle updateErr ...
// We still commit the transaction to save the 'failed' state!
tx.Commit()
return nil, err // Return the original error to the consumer loop
}
// For other errors, we let the defer tx.Rollback() handle it.
return nil, err
}
Now, when the message is redelivered, our wrapper will find the key with a failed status and can immediately reject the message, preventing wasted resources and moving it to a DLQ faster.
4. Performance Impact and Optimization
This pattern is not free. It introduces at least one SELECT and one INSERT/UPDATE to your database for every processed event.
* Latency: The overhead is primarily the network round-trip time to your database. For a well-located database, this might be 1-5ms. This is often an acceptable trade-off for correctness.
* Throughput: The primary bottleneck will be database write contention, especially on the idempotency_keys table. Ensure the primary key index is highly performant. For extremely high-throughput systems, consider partitioning the table.
* Storage Alternatives: Could you use Redis? Yes, with caveats. You would need to use a Lua script to achieve the atomic 'check-and-set' logic. The major drawback is Redis's default persistence model (RDB/AOF) may not provide the same durability guarantees as a transactional database like PostgreSQL. If Redis loses data between snapshots, you could lose idempotency records and risk duplicate processing. For financial or mission-critical data, the durability of PostgreSQL is almost always the correct choice.
Conclusion: The Price of Correctness
The Asynchronous Saga pattern is a powerful tool for managing distributed workflows, but it's incomplete without a robust strategy for handling message duplication. Implementing idempotent consumers using a transactional key store is a production-proven pattern that elevates a brittle workflow into a resilient, fault-tolerant system.
By leveraging the transactional guarantees of a relational database like PostgreSQL and the explicit locking provided by SELECT FOR UPDATE, we can eliminate race conditions and ensure that business logic is executed exactly once, even in the chaotic environment of a distributed system with failing nodes and network partitions. The added complexity and minor performance overhead are a necessary and worthwhile investment for any system where data integrity is paramount.