Idempotency Layers for Asynchronous Event-Driven Systems

16 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitability of Duplicates in Event-Driven Architectures

In the world of distributed systems, the siren song of "exactly-once" processing is often just that—a myth. Most modern message brokers and event streaming platforms, from AWS SQS and RabbitMQ to Apache Kafka, offer an at-least-once delivery guarantee. This is a pragmatic engineering trade-off, prioritizing message durability over the immense complexity of guaranteeing single delivery across network partitions, producer retries, and consumer failures.

For a senior engineer, this isn't news; it's a foundational constraint. The direct consequence is that any non-idempotent consumer becomes a ticking time bomb. A simple network blip causing a producer to retry sending an event can lead to catastrophic side effects: double-charging a customer, sending duplicate notifications, or corrupting state through repeated, non-idempotent database operations.

The only robust solution is to shift the responsibility for idempotency from the infrastructure to the application layer. Consumers must be designed to handle duplicate messages gracefully, executing the intended side effect only once, no matter how many times the same message is delivered. This article provides a comprehensive, production-ready blueprint for building such an idempotency layer, focusing on a transactional, database-backed approach that guarantees consistency and correctness even under high concurrency and failure scenarios.


Core Pattern: The Idempotency Key and State Store

The fundamental mechanism for enabling idempotency is the Idempotency Key. This is a unique client-generated identifier that accompanies every request or event. The consumer uses this key to track the processing status of each unique operation.

  • Key Generation: The producer is responsible for generating this key. It must be unique for each distinct operation but remain identical across all retries of that same operation. Common strategies include:
  • - UUIDs (v4 or v7): Generated by the client before the first attempt.

    - Content Hash: A stable hash (e.g., SHA-256) of the request payload's immutable fields.

    - Composite Key: A combination of a stable user ID and a client-side transaction ID.

  • The State Store: The consumer requires a persistent, centralized store to record the idempotency keys it has processed and the outcomes of those operations. While a fast key-value store like Redis is a common choice, this article will argue for and detail an implementation using a relational database (e.g., PostgreSQL). The primary reason is its ability to tie the idempotency check and the business logic together within a single ACID transaction, eliminating a whole class of subtle and dangerous race conditions.
  • The Idempotency Record State Machine

    To handle concurrency and failures, our idempotency record needs a simple state machine:

  • STARTED: The key has been seen, and processing has begun. This state is crucial for detecting concurrent attempts.
  • COMPLETED: The business logic has executed successfully, and the result is stored. Subsequent requests with the same key will receive this stored result without re-executing the logic.
  • FAILED: The business logic failed. This allows a subsequent retry with the same key to re-attempt the operation.
  • Here is the schema for our idempotency table in PostgreSQL:

    sql
    CREATE TYPE idempotency_status AS ENUM ('started', 'completed', 'failed');
    
    CREATE TABLE idempotency_keys (
        -- The client-provided idempotency key.
        idempotency_key VARCHAR(255) PRIMARY KEY,
    
        -- The current status of the operation.
        status idempotency_status NOT NULL,
    
        -- The user or tenant this key belongs to, crucial for multi-tenancy and indexing.
        user_id UUID NOT NULL,
    
        -- The HTTP status code and response body to return on subsequent requests.
        -- Stored as JSONB for flexibility.
        response_code INT,
        response_body JSONB,
    
        -- Timestamps for lifecycle management and debugging.
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    
        -- A lock timestamp to prevent orphaned 'started' records from blocking forever.
        locked_at TIMESTAMPTZ
    );
    
    -- A multi-column index is critical for lookup performance.
    CREATE INDEX idx_idempotency_keys_user_id ON idempotency_keys (user_id);

    Design Rationale:

  • idempotency_key is the PRIMARY KEY, which provides an implicit unique index and is the fastest way to look up a record.
  • user_id is included for multi-tenancy. It allows for partitioning or creating composite indexes to ensure lookups are scoped and performant.
  • response_code and response_body cache the result of the original successful operation. This is the payload we return on duplicate requests.
  • locked_at is an important edge-case handler. If a process dies mid-operation, a record could be stuck in the started state indefinitely. A background job can periodically check for records where NOW() - locked_at exceeds a timeout and move them to failed to allow for retries.

  • The Atomic Check-and-Set: Preventing Race Conditions

    The most critical part of the implementation is the atomic operation of checking for a key's existence and creating it if it's not present. A naive SELECT followed by an INSERT is a classic race condition waiting to happen:

    mermaid
    graph TD
        subgraph Time
            direction LR
            T1 --> T2 --> T3 --> T4
        end
    
        subgraph Process A
            A1(T1: SELECT key 'xyz') --> A2(T2: Not found)
            A2 --> A3(T4: INSERT key 'xyz')
        end
    
        subgraph Process B
            B1(T1: SELECT key 'xyz') --> B2(T3: Not found)
            B2 --> B3(T4: INSERT key 'xyz')
        end
    
        A3 --> F1(FAIL: Unique constraint violation)
        B3 --> F2(FAIL: Unique constraint violation)

    Both processes would read from the database, find no key, and then both would attempt to insert, with one failing on the unique constraint. The correct approach leverages the database's built-in atomicity guarantees.

    Production Pattern: `INSERT ... ON CONFLICT`

    PostgreSQL's INSERT ... ON CONFLICT DO NOTHING is the perfect tool for this. It attempts an insert and, if it violates a unique constraint (our primary key), it simply does nothing and moves on, all within a single atomic database operation. This elegantly solves the race condition.

    The full, robust flow for a consumer processing a message looks like this:

  • Start a database transaction.
  • Attempt to INSERT the idempotency key with a status of started using ON CONFLICT DO NOTHING. Also set locked_at to the current time.
  • Check the result of the INSERT:
  • - If rows were affected (the insert succeeded): This is the first time we've seen this key. Proceed with the core business logic.

    - If no rows were affected (a conflict occurred): The key already exists. We must SELECT the existing record to determine its state.

  • Handle existing key states:
  • - If status is COMPLETED: The operation is already done. Do not re-execute business logic. Immediately return the stored response_code and response_body.

    - If status is STARTED: Another process is currently handling this request. The locked_at timestamp can be checked against a timeout. The service should immediately respond with a conflict error (e.g., HTTP 409) or, in some use cases, enter a brief poll/wait loop.

    - If status is FAILED: The previous attempt failed. It's safe to retry. UPDATE the status back to started, update locked_at, and proceed with the business logic.

  • Execute Business Logic: If it's a new request or a retry of a failed one, execute the main operation (e.g., charge a card, create an order).
  • Finalize the Transaction:
  • - On Success: UPDATE the idempotency record's status to completed, store the response payload, and clear locked_at.

    - On Failure: UPDATE the idempotency record's status to failed.

  • Commit the database transaction. This atomically saves both the business logic's results and the final state of the idempotency record.
  • Go Implementation Example

    Here is a production-grade implementation in Go using the database/sql package. This IdempotencyStore would be injected into your service layer.

    go
    package main
    
    import (
    	"context"
    	"database/sql"
    	"encoding/json"
    	"errors"
    	"fmt"
    	"time"
    
    	_ "github.com/lib/pq"
    )
    
    // IdempotencyStatus defines the state of an idempotency key.
    type IdempotencyStatus string
    
    const (
    	StatusStarted   IdempotencyStatus = "started"
    	StatusCompleted IdempotencyStatus = "completed"
    	StatusFailed    IdempotencyStatus = "failed"
    )
    
    // IdempotencyRecord holds the state for a given key.
    type IdempotencyRecord struct {
    	Key          string
    	Status       IdempotencyStatus
    	ResponseCode int
    	ResponseBody []byte
    }
    
    // Pre-defined errors for clear control flow.
    var (
    	ErrConflict      = errors.New("request with this idempotency key is already in progress")
    	ErrAlreadyExists = errors.New("request with this idempotency key has already been completed")
    )
    
    // IdempotencyManager handles the idempotency logic.
    type IdempotencyManager struct {
    	DB *sql.DB
    }
    
    // NewIdempotencyManager creates a new manager.
    func NewIdempotencyManager(db *sql.DB) *IdempotencyManager {
    	return &IdempotencyManager{DB: db}
    }
    
    // BeginOperation starts the idempotency check within a transaction.
    // It returns the existing record if found, or nil if this is a new operation.
    func (m *IdempotencyManager) BeginOperation(ctx context.Context, tx *sql.Tx, key string, userID string) (*IdempotencyRecord, error) {
    	// 1. Atomic INSERT ... ON CONFLICT
    	// We use RETURNING to see if we actually inserted a row.
    	query := `
            INSERT INTO idempotency_keys (idempotency_key, user_id, status, locked_at)
            VALUES ($1, $2, 'started', NOW())
            ON CONFLICT (idempotency_key) DO NOTHING
            RETURNING idempotency_key
        `
    	var insertedKey string
    	err := tx.QueryRowContext(ctx, query, key, userID).Scan(&insertedKey)
    
    	if err == nil {
    		// Success! We inserted the key, this is a new operation.
    		return nil, nil
    	} else if err != sql.ErrNoRows {
    		// Some unexpected database error occurred.
    		return nil, fmt.Errorf("failed to insert idempotency key: %w", err)
    	}
    
    	// 2. If we reach here, it means ON CONFLICT was triggered. The key exists.
    	// We need to fetch the existing record to decide what to do.
    	query = `
            SELECT status, response_code, response_body, locked_at
            FROM idempotency_keys
            WHERE idempotency_key = $1
        `
    	var status string
    	var responseCode sql.NullInt64
    	var responseBody sql.NullString
    	var lockedAt sql.NullTime
    
    	if err := tx.QueryRowContext(ctx, query, key).Scan(&status, &responseCode, &responseBody, &lockedAt); err != nil {
    		return nil, fmt.Errorf("failed to select existing idempotency key: %w", err)
    	}
    
    	// 3. Handle existing key states
    	switch IdempotencyStatus(status) {
    	case StatusCompleted:
    		// Operation is already done. Return the cached response.
    		return &IdempotencyRecord{
    			Key:          key,
    			Status:       StatusCompleted,
    			ResponseCode: int(responseCode.Int64),
    			ResponseBody: []byte(responseBody.String),
    		}, ErrAlreadyExists
    
    	case StatusStarted:
    		// Another process is working on this. Check the lock timeout.
    		// For a production system, this timeout should be configurable.
    		if lockedAt.Valid && time.Since(lockedAt.Time) < 5*time.Minute {
    			return nil, ErrConflict
    		}
    		// The lock has expired. We can take over.
    		fallthrough // Fall through to the FAILED case logic to retry.
    
    	case StatusFailed:
    		// Previous attempt failed or lock expired, we can retry.
    		// Update the lock time and set status back to 'started'.
    		updateQuery := `
                UPDATE idempotency_keys
                SET status = 'started', locked_at = NOW(), updated_at = NOW()
                WHERE idempotency_key = $1
            `
    		_, err := tx.ExecContext(ctx, updateQuery, key)
    		if err != nil {
    			return nil, fmt.Errorf("failed to relock idempotency key: %w", err)
    		}
    		return nil, nil // Proceed with business logic
    
    	default:
    		return nil, fmt.Errorf("unknown idempotency status: %s", status)
    	}
    }
    
    // CompleteOperation marks an operation as successful.
    func (m *IdempotencyManager) CompleteOperation(ctx context.Context, tx *sql.Tx, key string, code int, body interface{}) error {
    	bodyJSON, err := json.Marshal(body)
    	if err != nil {
    		return fmt.Errorf("failed to marshal response body: %w", err)
    	}
    
    	query := `
            UPDATE idempotency_keys
            SET status = 'completed', response_code = $2, response_body = $3, updated_at = NOW(), locked_at = NULL
            WHERE idempotency_key = $1
        `
    	_, err = tx.ExecContext(ctx, query, key, code, bodyJSON)
    	return err
    }
    
    // FailOperation marks an operation as failed.
    func (m *IdempotencyManager) FailOperation(ctx context.Context, tx *sql.Tx, key string) error {
    	query := `
            UPDATE idempotency_keys
            SET status = 'failed', updated_at = NOW(), locked_at = NULL
            WHERE idempotency_key = $1
        `
    	_, err := tx.ExecContext(ctx, query, key)
    	return err
    }

    Tying It All Together in a Service

    Now, let's see how this manager is used within a service that processes payments. Notice how the entire operation, from the idempotency check to the business logic, is wrapped in a single database transaction.

    go
    // PaymentService handles payment processing.
    type PaymentService struct {
    	DB         *sql.DB
    	IdemManager *IdempotencyManager
    }
    
    // ChargeRequest represents the input for a charge.
    type ChargeRequest struct {
    	Amount   int    `json:"amount"`
    	Currency string `json:"currency"`
    }
    
    // ChargeResponse represents the output.
    type ChargeResponse struct {
    	TransactionID string `json:"transactionId"`
    	Status        string `json:"status"`
    }
    
    func (s *PaymentService) CreateCharge(ctx context.Context, idempotencyKey, userID string, req ChargeRequest) (*ChargeResponse, int, error) {
    	tx, err := s.DB.BeginTx(ctx, nil)
    	if err != nil {
    		return nil, 500, fmt.Errorf("could not begin transaction: %w", err)
    	}
    	defer tx.Rollback() // Rollback is a no-op if the transaction is committed.
    
    	// 1. Begin idempotency check
    	existingRecord, err := s.IdemManager.BeginOperation(ctx, tx, idempotencyKey, userID)
    	if err != nil {
    		if errors.Is(err, ErrConflict) {
    			return nil, 409, err // 409 Conflict
    		} 
            if errors.Is(err, ErrAlreadyExists) {
    			// Return the cached response
    			var cachedResp ChargeResponse
    			if unmarshalErr := json.Unmarshal(existingRecord.ResponseBody, &cachedResp); unmarshalErr != nil {
    				return nil, 500, fmt.Errorf("failed to unmarshal cached response: %w", unmarshalErr)
    			}
    			return &cachedResp, existingRecord.ResponseCode, nil // e.g., 201 Created
    		}
    		// Any other error is a server error
    		return nil, 500, fmt.Errorf("idempotency check failed: %w", err)
    	}
    
    	// 2. Execute business logic (only if it's a new operation)
    	// This is a placeholder for your actual payment gateway integration.
    	transactionID, err := s.processPaymentInGateway(ctx, req.Amount, req.Currency)
    	if err != nil {
    		// Mark operation as failed and roll back
    		s.IdemManager.FailOperation(ctx, tx, idempotencyKey)
    		tx.Commit() // Commit the 'failed' state!
    		return nil, 500, fmt.Errorf("payment processing failed: %w", err)
    	}
    
    	// Also, persist the charge in our own database.
    	// This happens inside the same transaction.
    	_, err = tx.ExecContext(ctx, "INSERT INTO charges (id, user_id, amount) VALUES ($1, $2, $3)", transactionID, userID, req.Amount)
    	if err != nil {
    		// No need to call FailOperation here, the defer tx.Rollback() will handle it.
    		return nil, 500, fmt.Errorf("failed to save charge record: %w", err)
    	}
    
    	// 3. Complete the operation successfully
    	resp := &ChargeResponse{TransactionID: transactionID, Status: "succeeded"}
    	if err := s.IdemManager.CompleteOperation(ctx, tx, idempotencyKey, 201, resp); err != nil {
    		return nil, 500, fmt.Errorf("failed to complete idempotency record: %w", err)
    	}
    
    	// 4. Atomically commit everything
    	if err := tx.Commit(); err != nil {
    		return nil, 500, fmt.Errorf("failed to commit transaction: %w", err)
    	}
    
    	return resp, 201, nil
    }
    
    func (s *PaymentService) processPaymentInGateway(ctx context.Context, amount int, currency string) (string, error) {
    	// In a real system, this would make an RPC/HTTP call to a payment provider.
    	// For this example, we'll just simulate success.
    	fmt.Printf("Processing payment of %d %s...\n", amount, currency)
    	time.Sleep(100 * time.Millisecond)
    	return fmt.Sprintf("txn_%d", time.Now().UnixNano()), nil
    }
    

    This example demonstrates the critical pattern: the IdempotencyManager operates on the same transaction (tx) as the business logic (tx.ExecContext to insert into the charges table). When tx.Commit() is called, the creation of the charge record and the update of the idempotency key to completed happen atomically. If any part fails before the commit, the entire transaction is rolled back, leaving the system in a consistent state, ready for a safe retry.


    Advanced Considerations and Performance Tuning

    While the above pattern is robust, several factors must be considered in a high-throughput production environment.

    1. Idempotency Store Performance: DB vs. Redis

    Using the primary application database for idempotency keys provides maximum consistency but can become a performance bottleneck. The idempotency_keys table will experience high write contention.

  • PostgreSQL (as detailed):
  • - Pros: Unbeatable consistency. No two-phase commit needed. The idempotency state is perfectly synchronized with the business data.

    - Cons: Can create a hot spot in your primary database. Requires careful indexing and vacuuming/maintenance.

  • Redis:
  • - Pros: Extremely fast. Can handle a much higher volume of checks.

    - Cons: Lacks transactional consistency with your primary database. You now have a distributed transaction problem. If you successfully write to your PostgreSQL DB but fail to update the key in Redis, your system is inconsistent. This can be mitigated with patterns like the Transactional Outbox, but it adds significant complexity.

    Recommendation: Start with the PostgreSQL approach. For a vast majority of applications, a well-indexed table in a properly provisioned database is more than sufficient and infinitely safer. Only consider moving to a hybrid Redis/DB approach if you have hard proof that the idempotency_keys table is your primary system bottleneck.

    2. Record Cleanup and TTL

    An idempotency_keys table cannot grow indefinitely. You need a strategy for purging old records.

  • TTL-based Deletion: Add a expires_at column to the table, set to NOW() + interval (e.g., 24 hours). A simple background job (cron or a dedicated cleanup worker) can periodically run DELETE FROM idempotency_keys WHERE expires_at < NOW().
  • Partitioning: For extremely high-volume systems, PostgreSQL's native table partitioning is a superior solution. You can partition the idempotency_keys table by a time range (e.g., daily or hourly). Instead of a costly DELETE, you can simply detach and drop an old partition, which is a near-instantaneous metadata operation.
  • Example of partitioning by day:

    sql
    CREATE TABLE idempotency_keys_partitioned (
        -- columns as before
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    ) PARTITION BY RANGE (created_at);
    
    -- Create partitions for today and tomorrow
    CREATE TABLE idempotency_keys_y2023m10d26 PARTITION OF idempotency_keys_partitioned
        FOR VALUES FROM ('2023-10-26 00:00:00+00') TO ('2023-10-27 00:00:00+00');
    
    CREATE TABLE idempotency_keys_y2023m10d27 PARTITION OF idempotency_keys_partitioned
        FOR VALUES FROM ('2023-10-27 00:00:00+00') TO ('2023-10-28 00:00:00+00');
    
    -- A cron job would create new partitions and drop old ones.
    -- DROP TABLE idempotency_keys_y2023m10d25;

    3. Handling Orphaned Locks

    As mentioned, a process can die after marking a key as started but before completing or failing it. Our locked_at column helps with this. A background worker should periodically scan for stale locks:

    sql
    UPDATE idempotency_keys
    SET status = 'failed', locked_at = NULL, updated_at = NOW()
    WHERE status = 'started' AND locked_at < NOW() - INTERVAL '5 minutes';

    This query finds any operations that have been in the started state for more than 5 minutes and moves them to failed, making them eligible for a safe retry by a new worker.

    Conclusion

    Implementing a robust idempotency layer is not an optional feature in a serious event-driven system; it is a core requirement for correctness and reliability. By leveraging the ACID guarantees of a relational database like PostgreSQL, we can build a highly consistent and race-condition-free idempotency processor. The INSERT ... ON CONFLICT pattern provides an elegant and performant mechanism for the critical atomic check-and-set operation.

    While the implementation requires careful attention to detail—especially around transaction management, state transitions, and record lifecycle—the resulting system is resilient to the inevitable message duplication inherent in distributed architectures. This pattern moves the problem of idempotency from a recurring source of bugs and data corruption into a solved, foundational component of your service's architecture.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles