Idempotency Key Patterns for Asynchronous Microservice Sagas

18 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitable Failure Mode: Duplicate Processing in Asynchronous Sagas

In any non-trivial distributed system, the Saga pattern is a common solution for managing long-running, multi-step business transactions without distributed locks. However, its reliance on asynchronous messaging (e.g., via RabbitMQ, Kafka) introduces a significant challenge: at-least-once delivery semantics. A network partition, a consumer crash post-processing but pre-acknowledgment, or a broker redelivery can all result in the same message being processed multiple times.

Consider a standard e-commerce OrderCreate saga:

  • API Gateway: Receives POST /orders with an Idempotency-Key header.
  • Order Service: Creates an order in a PENDING state and publishes an OrderCreated event.
  • Payment Service: Consumes OrderCreated, processes payment, and publishes a PaymentProcessed event.
  • Shipping Service: Consumes PaymentProcessed, arranges shipping, and publishes a ShipmentArranged event.
  • If the Payment Service crashes after successfully charging the customer's card but before acknowledging the OrderCreated message, the message broker will redeliver it upon service restart. Without a robust idempotency mechanism, the customer is charged a second time. A simple check like IF NOT EXISTS (SELECT 1 FROM payments WHERE order_id = ?) is susceptible to race conditions where two concurrent consumers process the same message simultaneously.

    This article details a robust, database-backed, stateful idempotency pattern that solves this problem definitively, first for the synchronous API entry point and then extending it through the asynchronous saga steps.


    Section 1: The Foundation - A Stateful Idempotency Persistence Layer

    The core of our solution is not just to record that a key has been seen, but to track the state of the operation associated with that key. This allows us to differentiate between a request that is currently in-flight, one that has completed successfully, and one that has failed.

    We'll use a dedicated PostgreSQL table to manage this state. The schema is critical.

    sql
    CREATE TYPE idempotency_status AS ENUM ('started', 'completed', 'failed');
    
    CREATE TABLE idempotency_keys (
        -- The idempotency key itself, provided by the client.
        key_value VARCHAR(255) PRIMARY KEY,
    
        -- The current state of the associated operation.
        status idempotency_status NOT NULL,
    
        -- For completed requests, we store the response to return it on subsequent retries.
        response_code INT,
        response_body JSONB,
    
        -- For in-flight requests, this prevents other processes from starting the same work.
        -- Also acts as a TTL for garbage collection.
        locked_at TIMESTAMPTZ,
    
        -- The user or tenant this key belongs to, crucial for multi-tenant systems.
        user_id UUID NOT NULL,
    
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    
    -- An index for efficient lookups by user and key.
    CREATE INDEX idx_idempotency_keys_user_id_key_value ON idempotency_keys(user_id, key_value);
    
    -- An index for the garbage collection process.
    CREATE INDEX idx_idempotency_keys_locked_at ON idempotency_keys(locked_at);

    Key Design Decisions:

    * status ENUM: Explicitly modeling the operation's lifecycle (started, completed, failed) is the cornerstone of this pattern. It allows us to handle retries intelligently.

    * locked_at: This timestamp serves two purposes. It indicates when an operation began, and it acts as a timeout. If a request is started but locked_at is older than, say, 60 seconds, we can assume the processing worker died and allow a new request to proceed.

    * response_code & response_body: Storing the original successful response is what makes the operation truly idempotent from the client's perspective. A retry with the same key will receive the exact same response without re-executing the business logic.

    * user_id: In multi-tenant systems, scoping the idempotency key to a user or tenant is non-negotiable to prevent key collisions.


    Section 2: Implementation - The Synchronous API Entry Point Middleware

    Let's implement this logic as a Go middleware using the gin framework and pgx for database access. This middleware will wrap the initial API endpoint that kicks off the saga.

    The Idempotency Check Logic Flow

    The middleware's logic must be atomic and handle concurrency flawlessly. This is where PostgreSQL's pessimistic locking (SELECT ... FOR UPDATE) becomes invaluable.

  • Extract Key: Get the Idempotency-Key from the request header.
  • Begin Transaction: Start a new database transaction. All subsequent operations must occur within this transaction.
  • Lock and Check: Attempt to find an existing record for the key. Crucially, use SELECT ... FOR UPDATE to acquire a row-level lock. This blocks any other concurrent transaction from reading or modifying this specific row until our current transaction commits or rolls back.
  • Handle Existing Key:
  • * If the record exists and status is completed, immediately return the stored response_code and response_body.

    * If status is started and locked_at is recent, another request is actively being processed. Return a 409 Conflict.

    * If status is started but locked_at has expired, or if the status is failed, allow the current request to proceed. We'll update the existing record.

  • Handle New Key:
  • * If no record exists, INSERT a new one with status = 'started' and locked_at = NOW().

  • Pass to Handler: If the request is allowed to proceed, store the transaction object in the request context so the main business logic handler can use it. This is critical for ensuring the business logic and the idempotency record update are part of the same atomic operation.
  • Capture Response: After the handler executes, capture its response (status code and body).
  • Update Record and Commit: Update the idempotency record with status = 'completed' and the captured response. Commit the transaction.
  • Handle Errors: If the handler returns an error, update the record with status = 'failed', roll back the transaction, and return the error to the client.
  • Production-Grade Go Middleware

    go
    package middleware
    
    import (
    	"bytes"
    	"context"
    	"encoding/json"
    	"errors"
    	"io/ioutil"
    	"net/http"
    	"time"
    
    	"github.com/gin-gonic/gin"
    	"github.com/jackc/pgx/v5"
    	"github.com/jackc/pgx/v5/pgconn"
    	"github.com/jackc/pgx/v5/pgxpool"
    )
    
    const idempotencyKeyHeader = "Idempotency-Key"
    const lockTimeout = 60 * time.Second
    
    type IdempotencyRecord struct {
    	KeyValue      string    `db:"key_value"`
    	Status        string    `db:"status"`
    	ResponseCode  *int      `db:"response_code"`
    	ResponseBody  []byte    `db:"response_body"`
    	LockedAt      *time.Time `db:"locked_at"`
    	UserID        string    // Assuming user ID is retrieved from auth middleware
    }
    
    // responseWriter is a custom writer to capture the response body	ype responseWriter struct {
    	gin.ResponseWriter
    	body *bytes.Buffer
    }
    
    func (w responseWriter) Write(b []byte) (int, error) {
    	w.body.Write(b)
    	return w.ResponseWriter.Write(b)
    }
    
    func Idempotency(db *pgxpool.Pool) gin.HandlerFunc {
    	return func(c *gin.Context) {
    		key := c.GetHeader(idempotencyKeyHeader)
    		if key == "" {
    			c.Next()
    			return
    		}
    
    		userID, _ := c.Get("userID") // Assume set by a prior auth middleware
    
    		tx, err := db.Begin(c)
    		if err != nil {
    			c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "could not start transaction"})
    			return
    		}
    		defer tx.Rollback(c) // Rollback is a no-op if tx has been committed
    
    		var record IdempotencyRecord
    		// Use SELECT ... FOR UPDATE to acquire a lock on the row, preventing race conditions.
    		query := `SELECT key_value, status, response_code, response_body, locked_at FROM idempotency_keys WHERE user_id = $1 AND key_value = $2 FOR UPDATE`
    		err = tx.QueryRow(c, query, userID, key).Scan(&record.KeyValue, &record.Status, &record.ResponseCode, &record.ResponseBody, &record.LockedAt)
    
    		if err == nil { // Record found
    			if record.Status == "completed" {
    				c.Data(*record.ResponseCode, "application/json", record.ResponseBody)
    				c.Abort()
    				return
    			}
    
    			if record.Status == "started" && record.LockedAt != nil && time.Since(*record.LockedAt) < lockTimeout {
    				c.AbortWithStatusJSON(http.StatusConflict, gin.H{"error": "request with this key is already in progress"})
    				return
    			}
                // If lock expired or previous attempt failed, we proceed by updating the lock time.
                updateLockQuery := `UPDATE idempotency_keys SET locked_at = NOW(), status = 'started' WHERE user_id = $1 AND key_value = $2`
                _, updateErr := tx.Exec(c, updateLockQuery, userID, key)
                if updateErr != nil {
                    c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "could not update lock"})
                    return
                }
    
    		} else if errors.Is(err, pgx.ErrNoRows) { // Record not found, create it
    			insertQuery := `INSERT INTO idempotency_keys (user_id, key_value, status, locked_at) VALUES ($1, $2, 'started', NOW())`
    			_, insertErr := tx.Exec(c, insertQuery, userID, key)
    			if insertErr != nil {
                    // Handle rare race condition where another request inserted the key between SELECT and INSERT
                    var pgErr *pgconn.PgError
                    if errors.As(insertErr, &pgErr) && pgErr.Code == "23505" { // unique_violation
                        c.AbortWithStatusJSON(http.StatusConflict, gin.H{"error": "concurrent request conflict"})
                    } else {
                        c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "could not create idempotency record"})
                    }
    				return
    			}
    		} else { // Other database error
    			c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "database error"})
    			return
    		}
    
    		// Store the transaction in the context for the handler to use
    		c.Set("db_transaction", tx)
    
    		// Capture response
    		blw := &responseWriter{body: bytes.NewBufferString(""), ResponseWriter: c.Writer}
    		c.Writer = blw
    
    		c.Next()
    
    		// After handler execution, update the idempotency record
    		statusCode := c.Writer.Status()
    		responseBody := blw.body.Bytes()
    
    		if statusCode >= 200 && statusCode < 300 {
    			updateQuery := `UPDATE idempotency_keys SET status = 'completed', response_code = $1, response_body = $2 WHERE user_id = $3 AND key_value = $4`
    			_, err = tx.Exec(c, updateQuery, statusCode, responseBody, userID, key)
    		} else {
    			updateQuery := `UPDATE idempotency_keys SET status = 'failed' WHERE user_id = $1 AND key_value = $2`
    			_, err = tx.Exec(c, updateQuery, userID, key)
    		}
    
    		if err != nil {
    			// Log this critical failure. The transaction will be rolled back.
    			// The business logic might have committed, but the idempotency key is not marked completed.
    			// This is a potential state inconsistency that needs monitoring.
    			return
    		}
    
    		if err := tx.Commit(c); err != nil {
    			// Log this as well. Commit failed.
    		}
    	}
    }

    Your handler would then retrieve the transaction from the context:

    go
    func CreateOrderHandler(c *gin.Context) {
        tx, exists := c.Get("db_transaction")
        if !exists {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "transaction not found in context"})
            return
        }
        dbTx := tx.(pgx.Tx)
    
        // ... perform all business logic using dbTx ...
        // For example: create order record, publish event to outbox table, etc.
        // All within the same transaction.
    
        c.JSON(http.StatusCreated, gin.H{"order_id": "12345"})
    }

    This pattern ensures that the creation of the order and the final update of the idempotency key to completed are an atomic unit. If any part fails, the entire transaction is rolled back.


    Section 3: Propagating Idempotency Through the Asynchronous Saga

    The middleware handles the synchronous entry point. Now we must extend this protection to our asynchronous message consumers (e.g., the Payment Service).

    The strategy is to propagate the idempotency context through message headers. We derive a new, step-specific key from the original key.

    Key Derivation Strategy: original-idempotency-key:saga-step-name

    * Example: f47ac10b-58cc-4372-a567-0e02b2c3d479:process-payment

    This ensures each step in the saga has a unique idempotency key, allowing individual steps to be retried without affecting others.

    When the Order Service publishes the OrderCreated event, it attaches this derived key as a message header.

    go
    // In Order Service, after creating the order inside the transaction
    originalKey := c.GetHeader(idempotencyKeyHeader)
    paymentStepKey := originalKey + ":process-payment"
    
    message := Event{
        Type: "OrderCreated",
        Payload: map[string]interface{}{"order_id": newOrderID},
    }
    
    // Publish to an outbox table within the same transaction
    err := PublishToOutbox(dbTx, "payment_events", paymentStepKey, message)
    // A separate process (debezium, or a simple poller) will then move this to RabbitMQ/Kafka

    The Payment Service consumer then implements a similar idempotency check, but adapted for a message handler instead of an HTTP middleware.

    Asynchronous Consumer Idempotency Logic

    go
    // In Payment Service consumer
    
    func (s *PaymentService) HandleOrderCreated(ctx context.Context, msg amqp.Delivery) error {
        stepKey := msg.Headers["x-idempotency-key"].(string)
        if stepKey == "" {
            // Decide on a failure policy: reject, or process without idempotency?
            // For critical systems, reject the message.
            return errors.New("missing x-idempotency-key header")
        }
    
        // Assume userID can be extracted from message payload or another header
        userID := ... 
    
        tx, err := s.db.Begin(ctx)
        if err != nil {
            return err // Will cause message to be NACK'd and retried
        }
        defer tx.Rollback(ctx)
    
        // The logic is nearly identical to the HTTP middleware
        var recordId int // We don't need the full record, just to know if it exists
        query := `SELECT 1 FROM idempotency_keys WHERE user_id = $1 AND key_value = $2 FOR UPDATE`
        err = tx.QueryRow(ctx, query, userID, stepKey).Scan(&recordId)
    
        if err == nil { // Record found
            // The step has already been completed. This is a duplicate message.
            // Acknowledge the message and do nothing.
            msg.Ack(false)
            return tx.Commit(ctx) // Commit the empty transaction
        }
    
        if !errors.Is(err, pgx.ErrNoRows) {
            // A real DB error occurred. NACK and retry.
            return err
        }
    
        // No record found, this is the first time we're processing this step.
        // 1. Process the payment using the transaction `tx`
        err = s.processPaymentLogic(tx, msg.Body)
        if err != nil {
            // Business logic failed. Rollback and NACK.
            return err
        }
    
        // 2. Insert the idempotency record to mark this step as complete
        // We use a simplified schema/logic for async steps as we don't need to store a response.
        insertQuery := `INSERT INTO idempotency_keys (user_id, key_value, status) VALUES ($1, $2, 'completed')`
        _, err = tx.Exec(ctx, insertQuery, userID, stepKey)
        if err != nil {
            // Rollback and NACK. The insert could fail due to a race, but our FOR UPDATE should prevent that.
            return err
        }
    
        // 3. Commit the transaction
        if err := tx.Commit(ctx); err != nil {
            return err
        }
    
        // 4. Acknowledge the message to the broker
        msg.Ack(false)
        return nil
    }

    This ensures that even if the HandleOrderCreated function is invoked ten times with the same message, the payment logic runs exactly once. The SELECT ... FOR UPDATE serializes concurrent executions for the same key, and subsequent executions find the completed record and exit gracefully.


    Section 4: Performance and Scalability Considerations

    The SELECT ... FOR UPDATE pattern is incredibly robust but creates row-level locks, which can become a contention point under extremely high load. For most systems, this is a perfectly acceptable trade-off for correctness. However, for hyper-scale scenarios, we can optimize by offloading the initial lock acquisition to a distributed cache like Redis.

    Hybrid Approach: Redis for Locking, Postgres for State

    The goal is to use Redis for its high-speed, low-contention locking, while still relying on PostgreSQL as the durable source of truth for the idempotency state.

    Modified Logic Flow:

  • Attempt Redis Lock: Before starting the database transaction, try to acquire a distributed lock in Redis using the idempotency key.
  • go
        // SET key value NX PX timeout
        // NX -- Only set the key if it does not already exist.
        // PX -- Set the specified expire time, in milliseconds.
        ok, err := redisClient.SetNX(ctx, redisKey, "locked", lockTimeout).Result()
        if err != nil { /* handle redis error */ }
        if !ok {
            // Lock is already held. Return 409 Conflict immediately.
            c.AbortWithStatusJSON(http.StatusConflict, ...)
            return
        }
        defer redisClient.Del(ctx, redisKey) // Ensure lock is released
  • Check Postgres State: If the Redis lock is acquired, now proceed to the database. Check the idempotency_keys table. You can now use a regular SELECT without FOR UPDATE because Redis is ensuring serialization.
  • Handle Completed State: If the record in Postgres is already completed, release the Redis lock and return the stored response. This handles the race condition where a request completes between our Redis lock acquisition and our DB check.
  • Execute Business Logic: If the record is new or started, proceed with the business logic within a database transaction.
  • Update Postgres and Release Lock: On completion, update the Postgres record to completed and commit. Finally, explicitly delete the Redis lock.
  • Trade-offs:

    * Pros: Significantly reduces load and contention on the primary database. SELECT ... FOR UPDATE can be expensive. Redis SETNX is orders of magnitude faster.

    * Cons:

    * Increased Complexity: You now have to manage two systems (Redis and Postgres) and handle failures in both.

    Reduced Consistency Guarantees: There's a potential for the system to fail after the database transaction commits but before* the Redis lock is released. This would cause subsequent requests to fail with a 409 Conflict until the Redis key expires, even though the operation was successful. This is often an acceptable trade-off (availability vs. strict consistency) but must be understood.

    * Clock Skew: Distributed locks are sensitive to clock drift between nodes. The lock timeout is not a perfect guarantee.

    Benchmarking Considerations

    To decide which pattern to use, benchmark your specific workload. A tool like k6 can simulate high-concurrency requests with the same idempotency key.

    * Postgres-only: Measure the transaction wait time (pg_stat_activity.wait_event_type = 'Lock'). At a certain concurrency level, you will see this time increase dramatically as requests queue up for the row-level lock.

    * Redis+Postgres: Measure the P99 latency for the API endpoint. You should see much more consistent latency as the database is no longer the primary bottleneck for concurrent requests on the same resource.

    For a system handling thousands of requests per second where key collisions are frequent (e.g., a 'claim reward' button in a game), the Redis hybrid model is likely necessary. For a standard e-commerce checkout, where collisions on the same key are rare, the simplicity and correctness of the pure PostgreSQL approach is superior.


    Section 5: Edge Cases and Production Hardening

    * Garbage Collection: The idempotency_keys table will grow indefinitely. A simple background job that runs periodically is required:

    sql
        -- Delete records that are successfully completed and older than 30 days.
        DELETE FROM idempotency_keys WHERE status = 'completed' AND updated_at < NOW() - INTERVAL '30 days';
    
        -- Delete stale 'started' records where the worker likely died.
        DELETE FROM idempotency_keys WHERE status = 'started' AND locked_at < NOW() - INTERVAL '1 hour';

    * Partial Failures: The most critical failure is the inability to commit the final idempotency state. What if the business logic commits, but the UPDATE idempotency_keys ... statement fails? This is why both must be in the same transaction. The handler receives pgx.Tx, performs its work, and the middleware performs the final UPDATE and COMMIT. If the commit fails, the entire unit of work, including the business logic, is rolled back by the database, preserving a consistent state.

    Client Key Generation: Clients must be instructed to generate a high-entropy key (e.g., UUIDv4) and retry with the same key* on transient network errors or 5xx server responses. If they generate a new key for each retry, the entire idempotency mechanism is defeated.

    Conclusion

    Implementing true idempotency in a distributed, asynchronous system is a non-trivial engineering task that requires moving beyond simple key checks. By creating a stateful persistence layer, leveraging the atomic guarantees of database transactions and pessimistic locking, and carefully propagating context, we can build Sagas that are resilient to the inherent unreliability of distributed environments. The SELECT ... FOR UPDATE pattern in PostgreSQL provides a powerful, consistent, and relatively simple foundation. For systems operating at extreme scale, this can be augmented with distributed locking mechanisms like Redis, but it's crucial to understand the consistency trade-offs involved. This robust approach prevents costly production bugs like duplicate payments or duplicate shipments, forming a critical building block for reliable microservice architectures.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles