Idempotent Sagas: Exactly-Once Processing in Event-Driven Systems

15 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitable Contradiction of Distributed Systems: Reliability vs. Duplication

In any non-trivial event-driven architecture, the contract with our message broker (be it Kafka, RabbitMQ, or Pulsar) is almost universally at-least-once delivery. This is a pragmatic compromise. Guaranteeing exactly-once delivery at the infrastructure level is a notoriously difficult distributed consensus problem, often leading to unacceptable performance trade-offs. Therefore, the responsibility is shifted from the broker to the consumer.

This shift creates a critical challenge for service authors. If a service consumes an OrderCreated event, debits a customer's wallet, and then fails before acknowledging the message, the broker will redeliver it. The service, upon recovery, will see the same event and, without proper safeguards, debit the customer's wallet a second time. This is the classic failure mode that the Saga pattern aims to manage, but the base pattern itself doesn't solve the duplicate processing problem; it only coordinates the workflow.

The naive solution—checking if the action has already been performed before executing—is fraught with race conditions and non-atomic operations. For instance:

go
// DO NOT DO THIS - RACE CONDITION
func (s *PaymentService) handleDebitEvent(event OrderCreated) {
    // 1. Check if payment exists
    exists, _ := s.db.PaymentExistsForOrder(event.OrderID)
    if exists {
        return // Assume already processed
    }

    // 2. Create payment
    _ = s.db.CreatePayment(event.OrderID, event.Amount)
}

If two instances of this consumer process the same event concurrently (a common scenario during a rolling deploy or a consumer group rebalance), both could execute the check, find no existing payment, and proceed to create one. You have now double-charged the customer.

To build a truly resilient system, we must implement idempotency at the application layer. The consumer must be able to receive the same message multiple times and guarantee the end result is identical to having received it only once. The most robust method for achieving this is the Idempotency Key Pattern within a transactional boundary.

The Idempotency Key Pattern: A Transactional Blueprint

The core principle is to treat the processing of an event and the recording of that processing as a single, atomic operation. We accomplish this by passing a unique identifier—the idempotency key—with each event that initiates a mutable action. The consumer then uses this key to de-duplicate requests.

An idempotency key should be:

  • Unique: Uniquely identifies a single, specific operation attempt. A UUIDv4 generated by the original event producer is an excellent choice.
  • Persistent: Stored by the consumer alongside the outcome of the operation.
  • Atomic: The check for the key's existence and the execution of the business logic must occur within the same atomic unit of work, typically a database transaction.
  • Here is the high-level flow for an idempotent consumer:

  • An event is consumed. It contains a payload and an Idempotency-Key in its metadata/header.
    • The consumer begins a new database transaction.
  • Within the transaction, it attempts to find a record in a dedicated idempotency_keys table matching the provided key.
  • * Case A: Key Found. If the key is found, the associated operation has either completed or is in progress. The consumer can immediately return the stored result from that record without re-executing any business logic.

    * Case B: Key Not Found. This is the first time we've seen this key. The consumer inserts a new record for the key into the idempotency_keys table with a pending status.

    • The consumer proceeds to execute its core business logic (e.g., debiting the wallet).
  • Upon completion of the business logic, the consumer updates the record in the idempotency_keys table, changing the status to completed and storing the result.
    • The entire database transaction is committed.

    If at any point a failure occurs—the business logic fails, the database connection is lost, the process crashes—the entire transaction is rolled back. The entry in the idempotency_keys table is either never committed or remains in a state that can be safely retried. When the event is redelivered, the process repeats, ensuring the business logic is executed exactly once.

    Production-Grade Implementation: Go, Kafka, and PostgreSQL

    Let's build a concrete implementation for a payment-service that consumes OrderCreated events. We'll use Go for its concurrency features, Kafka as our message broker, and PostgreSQL for its robust transactional support.

    1. Database Schema

    First, we need a table to store our idempotency keys. This table is the cornerstone of the pattern.

    sql
    -- Status of the idempotent operation
    CREATE TYPE idempotency_status AS ENUM ('pending', 'completed', 'failed');
    
    -- Table to store idempotency keys and their results
    CREATE TABLE idempotency_keys (
        -- The key itself, provided by the client/producer
        key VARCHAR(255) PRIMARY KEY,
    
        -- The name of the user/actor performing the action, for locking and scoping
        -- In our event-driven case, this could be the consumer group ID or a static service name.
        actor_id VARCHAR(255) NOT NULL,
    
        -- The current status of the operation
        status idempotency_status NOT NULL DEFAULT 'pending',
    
        -- The result of the operation, stored as JSON to be flexible
        response_payload JSONB,
    
        -- The HTTP status code or an equivalent result code for the operation
        response_code INT,
    
        -- Timestamp for when the operation was locked for processing
        locked_at TIMESTAMPTZ,
    
        -- Timestamps for creation and updates
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    
    -- Crucial index for fast lookups
    CREATE INDEX idx_idempotency_keys_key ON idempotency_keys(key);
    
    -- Optional: Index for cleanup jobs
    CREATE INDEX idx_idempotency_keys_created_at ON idempotency_keys(created_at);

    We also need our business table, for example, payments.

    sql
    CREATE TABLE payments (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        order_id UUID NOT NULL UNIQUE,
        amount_cents INT NOT NULL,
        status VARCHAR(50) NOT NULL,
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );

    2. The Idempotency Middleware

    To keep our business logic clean, we'll encapsulate the idempotency check in a middleware or decorator. This function will wrap our actual event handler.

    go
    package main
    
    import (
    	"context"
    	"database/sql"
    	"encoding/json"
    	"errors"
    	"fmt"
    	"log"
    	"time"
    
    	"github.com/google/uuid"
    	_ "github.com/lib/pq"
    )
    
    // Represents the stored state of an idempotent operation
    type IdempotencyKey struct {
    	Key             string          `db:"key"`
    	ActorID         string          `db:"actor_id"`
    	Status          string          `db:"status"`
    	ResponsePayload json.RawMessage `db:"response_payload"`
    	ResponseCode    int             `db:"response_code"`
    	LockedAt        sql.NullTime    `db:"locked_at"`
    	CreatedAt       time.Time       `db:"created_at"`
    	UpdatedAt       time.Time       `db:"updated_at"`
    }
    
    // A generic response structure to store in the idempotency table
    type HandlerResponse struct {
    	StatusCode int
    	Body       []byte
    }
    
    // The signature for our actual business logic handlers
    type IdempotentHandler func(ctx context.Context, tx *sql.Tx, eventPayload []byte) (*HandlerResponse, error)
    
    // The middleware that wraps the business logic handler
    func IdempotencyWrapper(db *sql.DB, handler IdempotentHandler) func(ctx context.Context, idempotencyKey string, eventPayload []byte) (*HandlerResponse, error) {
    	return func(ctx context.Context, idempotencyKey string, eventPayload []byte) (*HandlerResponse, error) {
    		// 1. Begin Transaction
    		tx, err := db.BeginTx(ctx, nil)
    		if err != nil {
    			return nil, fmt.Errorf("failed to begin transaction: %w", err)
    		}
    		defer tx.Rollback() // Rollback on any error
    
    		// 2. Lock and Check for Existing Key. THIS IS THE CRITICAL STEP.
    		// `FOR UPDATE` acquires a row-level lock, preventing concurrent processors
    		// from interfering with this specific key.
    		var keyRecord IdempotencyKey
    		row := tx.QueryRowContext(ctx, `SELECT key, status, response_payload, response_code FROM idempotency_keys WHERE key = $1 FOR UPDATE`, idempotencyKey)
    		err = row.Scan(&keyRecord.Key, &keyRecord.Status, &keyRecord.ResponsePayload, &keyRecord.ResponseCode)
    
    		if err != nil && !errors.Is(err, sql.ErrNoRows) {
    			return nil, fmt.Errorf("failed to select idempotency key: %w", err)
    		}
    
    		// Case A: Key Found - Operation was already processed or is in progress
    		if err == nil {
    			if keyRecord.Status == "completed" {
    				log.Printf("Request with key %s already completed. Returning stored response.", idempotencyKey)
    				return &HandlerResponse{
    					StatusCode: keyRecord.ResponseCode,
    					Body:       keyRecord.ResponsePayload,
    				}, nil
    			} else if keyRecord.Status == "pending" {
                    // This could indicate a crash during a previous attempt.
                    // We can decide to either fail or proceed, depending on business requirements.
                    // For this example, we will proceed, as the transaction was rolled back.
                    log.Printf("Request with key %s found in pending state. Retrying operation.", idempotencyKey)
                }
    		} else { // Case B: Key Not Found - First time seeing this request
    			log.Printf("First time seeing key %s. Inserting new record.", idempotencyKey)
    			_, err = tx.ExecContext(ctx,
    				`INSERT INTO idempotency_keys (key, actor_id, status, locked_at) VALUES ($1, $2, 'pending', NOW())`,
    				idempotencyKey, "payment-service-v1")
    			if err != nil {
    				return nil, fmt.Errorf("failed to insert idempotency key: %w", err)
    			}
    		}
    
    		// 3. Execute Core Business Logic
    		response, err := handler(ctx, tx, eventPayload)
    		if err != nil {
    			// Business logic failed. We can optionally mark the key as 'failed'
    			// to prevent retries for non-transient errors.
    			_, updateErr := tx.ExecContext(ctx, `UPDATE idempotency_keys SET status = 'failed', updated_at = NOW() WHERE key = $1`, idempotencyKey)
    			if updateErr != nil {
    				log.Printf("CRITICAL: Failed to mark key %s as failed after business logic error: %v", idempotencyKey, updateErr)
    			}
    			return nil, fmt.Errorf("handler logic failed: %w", err)
    		}
    
    		// 4. Persist the successful result
    		_, err = tx.ExecContext(ctx,
    			`UPDATE idempotency_keys SET status = 'completed', response_payload = $1, response_code = $2, updated_at = NOW() WHERE key = $3`,
    			response.Body, response.StatusCode, idempotencyKey)
    		if err != nil {
    			return nil, fmt.Errorf("failed to update idempotency key to completed: %w", err)
    		}
    
    		// 5. Commit the entire transaction
    		if err := tx.Commit(); err != nil {
    			return nil, fmt.Errorf("failed to commit transaction: %w", err)
    		}
    
    		log.Printf("Successfully processed request with key %s.", idempotencyKey)
    		return response, nil
    	}
    }

    3. The Business Logic Handler

    Now our actual handler is simple. It receives the transaction object (*sql.Tx) and operates within it, completely unaware of the idempotency logic.

    go
    // Represents the payload of our OrderCreated event
    type OrderCreatedEvent struct {
    	OrderID    string `json:"order_id"`
    	CustomerID string `json:"customer_id"`
    	Amount     int    `json:"amount_cents"`
    }
    
    // processDebit is our core business logic.
    func processDebit(ctx context.Context, tx *sql.Tx, eventPayload []byte) (*HandlerResponse, error) {
    	var event OrderCreatedEvent
    	if err := json.Unmarshal(eventPayload, &event); err != nil {
    		return nil, fmt.Errorf("invalid event payload: %w", err)
    	}
    
    	// Business logic: create a payment record.
    	// In a real system, this would also call out to a payment gateway, etc.
    	// All database operations MUST use the provided `tx` object.
    	_, err := tx.ExecContext(ctx,
    		`INSERT INTO payments (order_id, amount_cents, status) VALUES ($1, $2, 'completed')`,
    		event.OrderID, event.Amount)
    
    	if err != nil {
            // Check for unique constraint violation on order_id to be extra safe
            // This can happen if the idempotency key logic somehow fails.
    		return nil, fmt.Errorf("failed to insert payment: %w", err)
    	}
    
    	responseBody, _ := json.Marshal(map[string]string{"status": "success", "order_id": event.OrderID})
    	return &HandlerResponse{
    		StatusCode: 200,
    		Body:       responseBody,
    	}, nil
    }
    
    // Main function to simulate a Kafka consumer
    func main() {
    	// Database connection setup
    	dbURL := "postgres://user:password@localhost:5432/idempotency_db?sslmode=disable"
    	db, err := sql.Open("postgres", dbURL)
    	if err != nil {
    		log.Fatalf("Could not connect to database: %v", err)
    	}
    	defer db.Close()
    
    	// Wrap our business logic with the idempotency middleware
    	debitHandler := IdempotencyWrapper(db, processDebit)
    
    	// --- Simulate receiving two identical Kafka messages ---
    	orderID := uuid.New().String()
    	eventPayload, _ := json.Marshal(OrderCreatedEvent{
    		OrderID:    orderID,
    		CustomerID: uuid.New().String(),
    		Amount:     10000, // 100.00
    	})
    
    	// The producer of the event generates the idempotency key
    	idempotencyKey := uuid.New().String()
    
    	log.Println("--- First attempt --- ")
    	_, err = debitHandler(context.Background(), idempotencyKey, eventPayload)
    	if err != nil {
    		log.Fatalf("First attempt failed: %v", err)
    	}
    
    	log.Println("--- Second attempt (duplicate) --- ")
    	_, err = debitHandler(context.Background(), idempotencyKey, eventPayload)
    	if err != nil {
    		log.Fatalf("Second attempt failed: %v", err)
    	}
    }

    When you run this, the output will clearly show the first attempt inserting the key and processing the payment, while the second attempt recognizes the key and returns the stored response without executing processDebit again.

    Advanced Edge Cases and Performance Considerations

    A basic implementation is a good start, but production systems present more challenges.

    1. The Criticality of `SELECT ... FOR UPDATE`

    The most important and subtle part of the implementation is SELECT ... FOR UPDATE. What does this do? When this query is executed within a transaction, PostgreSQL acquires a write lock on the selected row(s). If a row doesn't exist, it may lock the index range where the row would be inserted (gap-locking, depending on the isolation level).

    Consider two concurrent consumers (Pod A and Pod B) processing the exact same event:

  • Pod A begins transaction, executes SELECT ... FOR UPDATE WHERE key = 'xyz'. The row doesn't exist.
  • Pod B begins transaction, executes SELECT ... FOR UPDATE WHERE key = 'xyz'.
  • Without FOR UPDATE: Both would get sql.ErrNoRows and proceed to the INSERT statement. The second INSERT would fail due to the primary key constraint, causing a messy race condition that is hard to recover from gracefully.
  • With FOR UPDATE: Pod A's query establishes a lock. Pod B's SELECT statement will block and wait. It will not proceed until Pod A's transaction either commits or rolls back.
  • * If Pod A commits, its INSERT will have created the row. When Pod B's SELECT unblocks, it will now find the row and correctly follow the "Key Found" logic path.

    * If Pod A rolls back, the lock is released. Pod B's SELECT unblocks, finds no row, and proceeds to attempt the operation itself.

    This pessimistic lock serializes access to a given idempotency key, elegantly solving the race condition at the cost of blocking one of the consumers.

    2. Idempotency Key TTL and Table Bloat

    The idempotency_keys table will grow indefinitely. Storing keys forever is unnecessary, as the window for message redelivery is typically finite (e.g., Kafka's retention period, or a few hours/days for transient failures). You must have a cleanup strategy.

    * Periodic Deletion Job: The most common approach. A background job runs periodically (e.g., every night) and deletes keys older than a configured TTL (e.g., 48 hours).

    sql
        DELETE FROM idempotency_keys WHERE created_at < NOW() - INTERVAL '48 hours';

    This is simple and effective. Ensure created_at is indexed to make the DELETE efficient.

    * Database-specific TTL: Some databases like MongoDB and DynamoDB have built-in TTL features on documents. While convenient, relying on these can be less portable. For PostgreSQL, extensions like pg_partman can be used to partition the table by date, allowing old partitions to be dropped efficiently instead of performing a large DELETE.

    3. Handling Non-Transient Business Logic Failures

    What if the business logic fails for a reason that is not transient? For example, an InsufficientFunds error. If we simply roll back the transaction, the message will be redelivered, and we will retry the same failing operation, potentially indefinitely, until it's moved to a dead-letter queue (DLQ).

    A better approach is to catch specific, non-retriable errors and mark the idempotency key as failed.

    go
    // Inside the IdempotencyWrapper
    response, err := handler(ctx, tx, eventPayload)
    if err != nil {
        var insufficientFundsErr *InsufficientFundsError
        if errors.As(err, &insufficientFundsErr) {
            // This is a terminal failure. Mark it as such.
            _, updateErr := tx.ExecContext(ctx, `UPDATE idempotency_keys SET status = 'failed', response_payload = $1, response_code = 400 WHERE key = $2`, insufficientFundsErr.Error(), idempotencyKey)
            // ... handle updateErr ...
            // We still commit the transaction to save the 'failed' state!
            tx.Commit()
            return nil, err // Return the original error to the consumer loop
        }
        // For other errors, we let the defer tx.Rollback() handle it.
        return nil, err
    }

    Now, when the message is redelivered, our wrapper will find the key with a failed status and can immediately reject the message, preventing wasted resources and moving it to a DLQ faster.

    4. Performance Impact and Optimization

    This pattern is not free. It introduces at least one SELECT and one INSERT/UPDATE to your database for every processed event.

    * Latency: The overhead is primarily the network round-trip time to your database. For a well-located database, this might be 1-5ms. This is often an acceptable trade-off for correctness.

    * Throughput: The primary bottleneck will be database write contention, especially on the idempotency_keys table. Ensure the primary key index is highly performant. For extremely high-throughput systems, consider partitioning the table.

    * Storage Alternatives: Could you use Redis? Yes, with caveats. You would need to use a Lua script to achieve the atomic 'check-and-set' logic. The major drawback is Redis's default persistence model (RDB/AOF) may not provide the same durability guarantees as a transactional database like PostgreSQL. If Redis loses data between snapshots, you could lose idempotency records and risk duplicate processing. For financial or mission-critical data, the durability of PostgreSQL is almost always the correct choice.

    Conclusion: The Price of Correctness

    The Asynchronous Saga pattern is a powerful tool for managing distributed workflows, but it's incomplete without a robust strategy for handling message duplication. Implementing idempotent consumers using a transactional key store is a production-proven pattern that elevates a brittle workflow into a resilient, fault-tolerant system.

    By leveraging the transactional guarantees of a relational database like PostgreSQL and the explicit locking provided by SELECT FOR UPDATE, we can eliminate race conditions and ensure that business logic is executed exactly once, even in the chaotic environment of a distributed system with failing nodes and network partitions. The added complexity and minor performance overhead are a necessary and worthwhile investment for any system where data integrity is paramount.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles