Atomic Idempotency Patterns in Event-Driven Systems with Redis & Lua

15 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inescapable Problem: Duplicate Processing in Asynchronous Systems

In any distributed, asynchronous architecture, the guarantee of message delivery is often "at-least-once." Network partitions, service restarts, and client-side retry logic mean your message consumers or API endpoints will inevitably process the same logical request multiple times. For non-mutating operations, this is benign. For critical business logic—payment processing, order creation, inventory updates—it's a catastrophic failure mode leading to data corruption, financial loss, and customer distrust.

The standard solution is idempotency: designing an operation so that receiving it multiple times has the same effect as receiving it once. This is typically achieved with an idempotency key, a unique client-generated identifier for each distinct operation. The server stores the result of the first successful operation against this key and simply returns the cached result for any subsequent retries.

This sounds simple, but the implementation is fraught with peril. The most critical failure point is the non-atomic nature of the typical "check-then-act" logic in a high-concurrency environment. A naive implementation is a ticking time bomb.

Why Naive Implementations Fail: The Race Condition

Consider a simple flow using a relational database or a basic key-value store:

  • Receive Request: A worker receives a request with Idempotency-Key: key-123.
  • Check: The worker queries the idempotency_keys table/store: SELECT result FROM idempotency_keys WHERE key = 'key-123'.
  • Act: If no result is found, the worker executes the business logic.
  • Store: The worker then saves the result: INSERT INTO idempotency_keys (key, result) VALUES ('key-123', '...').
  • Now, imagine two requests with the same key arriving milliseconds apart, processed by two different workers (or threads):

    mermaid
    sequenceDiagram
        participant C as Client
        participant W1 as Worker 1
        participant W2 as Worker 2
        participant DB as Database
    
        C->>W1: POST /charge (key: key-123)
        C->>W2: POST /charge (key: key-123) [Retry]
    
        W1->>DB: SELECT result WHERE key='key-123'
        DB-->>W1: (Not Found)
    
        W2->>DB: SELECT result WHERE key='key-123'
        DB-->>W2: (Not Found)
    
        note over W1, W2: Both workers believe they are the first!
    
        W1->>DB: Execute Payment Logic...
        W2->>DB: Execute Payment Logic... (!!) 
    
        W1->>DB: INSERT result for 'key-123'
        DB-->>W1: OK
    
        W2->>DB: INSERT result for 'key-123'
        DB-->>W2: Fails (Unique Key Violation)
    
        note over W2: The operation has already run twice.

    Even with a unique constraint on the key, the business logic has already executed twice. The database constraint only prevents storing the duplicate result; it doesn't prevent the duplicate action. This is the fundamental race condition we must eliminate.

    The Atomic Primitive: Redis `SETNX`

    To solve the race condition, the "check-and-set" operation must be atomic. This is where Redis excels. The SET key value NX command (SETNX is a legacy alias) sets a key only if it does not already exist. This single, atomic operation forms the foundation of a distributed lock.

    We can build a more robust, two-phase idempotency check around this primitive. We need to manage three states for a given key:

  • Started/Locked: The operation has been received and is being processed.
  • Completed: The operation finished successfully, and the result is stored.
  • Failed: The operation failed, and the error information is stored.
  • Let's design a flow using two keys in Redis for each idempotency key:

    * idem:started:{key}: A lock key to signal that processing has begun.

    * idem:result:{key}: A key to store the final serialized result (success or failure).

    Here is the refined, SETNX-based logic:

  • A worker receives a request with Idempotency-Key: key-123.
  • First, check if a final result already exists: GET idem:result:key-123. If it exists, return the cached result immediately.
  • If no result exists, attempt to acquire a lock: SET idem:started:key-123 NX EX . The EX sets an expiration to prevent indefinite locks if a worker crashes.
  • If SET returns OK (lock acquired):
  • a. Execute the business logic.

    b. Upon completion, store the result: SET idem:result:key-123 EX .

    c. Release the lock: DEL idem:started:key-123.

  • If SET returns nil (lock is held by another worker):
  • a. The request is already being processed. The client should wait and retry.

    b. The client can poll the idem:result:key-123 key until a result is available.

    Go Implementation with `go-redis`

    Here's a production-grade implementation of this pattern in Go. This IdempotencyStore encapsulates the logic.

    go
    // main.go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"errors"
    	"fmt"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    	"github.com/google/uuid"
    )
    
    // Represents the stored result of an idempotent operation.
    // Can store either a success payload or an error.
    type StoredResponse struct {
    	StatusCode int    `json:"statusCode"`
    	Body       string `json:"body"`
    	Error      string `json:"error,omitempty"`
    }
    
    // Custom errors for idempotency logic
    var (
    	ErrRequestInProgress = errors.New("request in progress")
    )
    
    // IdempotencyStore manages the lifecycle of idempotent requests using Redis.
    type IdempotencyStore struct {
    	client   *redis.Client
    	lockTTL  time.Duration // How long the processing lock should be held
    	resultTTL time.Duration // How long the final result should be stored
    }
    
    func NewIdempotencyStore(client *redis.Client) *IdempotencyStore {
    	return &IdempotencyStore{
    		client:   client,
    		lockTTL:  30 * time.Second,   // A sensible default lock time
    		resultTTL: 24 * time.Hour, // Store results for a full day
    	}
    }
    
    func (s *IdempotencyStore) startedKey(key string) string {
    	return fmt.Sprintf("idem:started:%s", key)
    }
    
    func (s *IdempotencyStore) resultKey(key string) string {
    	return fmt.Sprintf("idem:result:%s", key)
    }
    
    // BeginRequest attempts to start processing for an idempotency key.
    // It returns a previously stored response if one exists.
    // It returns ErrRequestInProgress if another worker holds the lock.
    // It returns nil if the lock was successfully acquired.
    func (s *IdempotencyStore) BeginRequest(ctx context.Context, key string) (*StoredResponse, error) {
    	// 1. Check for a final result first.
    	resultData, err := s.client.Get(ctx, s.resultKey(key)).Bytes()
    	if err == nil {
    		var response StoredResponse
    		if jsonErr := json.Unmarshal(resultData, &response); jsonErr == nil {
    			fmt.Printf("[IDEM %s] Found cached result.\n", key)
    			return &response, nil
    		}
    		// Handle case where data is corrupted, proceed as if not found
    	}
    	if err != redis.Nil {
    		return nil, fmt.Errorf("redis GET failed: %w", err) // Genuine Redis error
    	}
    
    	// 2. Attempt to acquire the lock.
    	workerID := uuid.NewString()
    	wasSet, err := s.client.SetNX(ctx, s.startedKey(key), workerID, s.lockTTL).Result()
    	if err != nil {
    		return nil, fmt.Errorf("redis SETNX failed: %w", err)
    	}
    
    	if !wasSet {
    		fmt.Printf("[IDEM %s] Lock held by another worker.\n", key)
    		return nil, ErrRequestInProgress
    	}
    
    	fmt.Printf("[IDEM %s] Acquired lock with worker ID %s.\n", key, workerID)
    	return nil, nil // Lock acquired, ready to process
    }
    
    // CompleteRequest stores the final result of an operation and releases the lock.
    func (s *IdempotencyStore) CompleteRequest(ctx context.Context, key string, response StoredResponse) error {
    	resultData, err := json.Marshal(response)
    	if err != nil {
    		return fmt.Errorf("failed to marshal response: %w", err)
    	}
    
    	// Use a pipeline to ensure atomicity of setting result and deleting lock
    	pipe := s.client.TxPipeline()
    	pipe.Set(ctx, s.resultKey(key), resultData, s.resultTTL)
    	pipe.Del(ctx, s.startedKey(key))
    
    	_, err = pipe.Exec(ctx)
    	if err != nil {
    		return fmt.Errorf("redis pipeline failed: %w", err)
    	}
    
    	fmt.Printf("[IDEM %s] Stored result and released lock.\n", key)
    	return nil
    }
    
    // --- Example Usage ---
    
    func main() {
    	ctx := context.Background()
    	rdb := redis.NewClient(&redis.Options{
    		Addr: "localhost:6379",
    	})
    
    	store := NewIdempotencyStore(rdb)
    	idempotencyKey := "order-45678"
    
    	// Simulate a business operation
    	processPayment := func(key string) (*StoredResponse, error) {
    		// This is where your core business logic lives
    		fmt.Printf("[BIZ %s] Processing payment...\n", key)
    		time.Sleep(2 * time.Second) // Simulate work
    		fmt.Printf("[BIZ %s] Payment successful.\n", key)
    		return &StoredResponse{StatusCode: 200, Body: `{"transactionId": "txn_abc123"}`}, nil
    	}
    
    	handleRequest := func(key string) {
    		// Begin the idempotent request
    		cachedResponse, err := store.BeginRequest(ctx, key)
    		if err != nil {
    			if err == ErrRequestInProgress {
    				fmt.Printf("[HANDLER %s] Request is already in progress. Will not re-process.\n", key)
    				return
    			}
    			fmt.Printf("[HANDLER %s] Error starting request: %v\n", key, err)
    			return
    		}
    
    		if cachedResponse != nil {
    			fmt.Printf("[HANDLER %s] Returning cached response: %+v\n", key, *cachedResponse)
    			return
    		}
    
    		// Lock acquired, execute the core logic
    		response, bizErr := processPayment(key)
    		if bizErr != nil {
    			// Store the failure result
    			failedResponse := StoredResponse{StatusCode: 500, Error: bizErr.Error()}
    			store.CompleteRequest(ctx, key, failedResponse)
    			return
    		}
    
    		// Store the success result
    		store.CompleteRequest(ctx, key, *response)
    	}
    
    	// Simulate two concurrent requests
    	fmt.Println("--- Simulating concurrent requests ---")
    	go handleRequest(idempotencyKey)
    	go handleRequest(idempotencyKey)
    
    	time.Sleep(5 * time.Second)
    
    	// Simulate a third request after the first has completed
    	fmt.Println("\n--- Simulating a later request ---")
    	handleRequest(idempotencyKey)
    
    	time.Sleep(2 * time.Second)
    }

    This SETNX approach is a significant improvement. It's atomic at the point of decision, preventing the race condition. However, it's not perfect. There are subtle edge cases and opportunities for optimization that a truly robust system must address.

    The Ultimate Solution: A State Machine in Lua

    The SETNX pattern requires multiple network round-trips to Redis: GET result, SETNX lock, then a SET/DEL pipeline. Each round-trip adds latency. More critically, the logic is spread across the client application and Redis. A more powerful approach is to encapsulate the entire state transition logic within Redis itself using a Lua script.

    Redis guarantees the atomic execution of Lua scripts. No other command can run while a script is executing. This allows us to create a single, atomic operation that handles all the initial state checks and lock acquisition.

    Our Lua script will implement a complete state machine for an idempotency key:

    * Inputs: KEYS[1] (idempotency key), ARGV[1] (lock owner ID), ARGV[2] (lock TTL), ARGV[3] (result TTL).

    * Outputs: A status code and an optional payload.

    * {1, result}: Result was cached.

    * {2, nil}: Lock acquired successfully.

    * {3, nil}: Lock is held by another worker.

    The Lua Script (`idempotency.lua`)

    lua
    -- idempotency.lua
    -- KEYS[1]: The base idempotency key (e.g., 'order-45678')
    -- ARGV[1]: A unique ID for the worker acquiring the lock (e.g., a UUID)
    -- ARGV[2]: The TTL for the lock in seconds
    -- ARGV[3]: The TTL for the final result in seconds
    
    local result_key = 'idem:result:' .. KEYS[1]
    local started_key = 'idem:started:' .. KEYS[1]
    
    -- 1. Check for a finished result first.
    local result = redis.call('GET', result_key)
    if result then
      -- Operation is already complete, return the cached result.
      return {1, result}
    end
    
    -- 2. Check if the operation is already in progress.
    local owner = redis.call('GET', started_key)
    if owner then
      -- Lock is held. Return status 3 to indicate it's in progress.
      return {3, nil}
    end
    
    -- 3. No result and no lock. Attempt to acquire the lock.
    redis.call('SET', started_key, ARGV[1], 'EX', ARGV[2])
    
    -- Return status 2 to indicate lock was successfully acquired.
    return {2, nil}

    This script is significantly more efficient. It performs all checks and the lock acquisition in a single, atomic server-side operation.

    Integrating the Lua Script in Go

    We now refactor our IdempotencyStore to use this script. We'll load the script into Redis once using SCRIPT LOAD and then call it via its SHA1 hash using EVALSHA for maximum efficiency.

    go
    // main_lua.go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"errors"
    	"fmt"
    	"os"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    	"github.com/google/uuid"
    )
    
    // StoredResponse and errors are the same as before...
    
    // IdempotencyStore with Lua script support
    type IdempotencyStoreLua struct {
    	client    *redis.Client
    	scriptSHA string
    	lockTTL   time.Duration
    	resultTTL time.Duration
    }
    
    func NewIdempotencyStoreLua(ctx context.Context, client *redis.Client) (*IdempotencyStoreLua, error) {
    	luaScript, err := os.ReadFile("idempotency.lua")
    	if err != nil {
    		return nil, fmt.Errorf("failed to read lua script: %w", err)
    	}
    
    	sha, err := client.ScriptLoad(ctx, string(luaScript)).Result()
    	if err != nil {
    		return nil, fmt.Errorf("failed to load lua script: %w", err)
    	}
    
    	fmt.Printf("Loaded idempotency Lua script with SHA: %s\n", sha)
    
    	return &IdempotencyStoreLua{
    		client:    client,
    		scriptSHA: sha,
    		lockTTL:   30 * time.Second,
    		resultTTL: 24 * time.Hour,
    	}, nil
    }
    
    // BeginRequest now uses the atomic Lua script.
    func (s *IdempotencyStoreLua) BeginRequest(ctx context.Context, key string) (*StoredResponse, error) {
    	workerID := uuid.NewString()
    
    	args := []interface{}{key}
    	argv := []interface{}{workerID, s.lockTTL.Seconds(), s.resultTTL.Seconds()}
    
    	res, err := s.client.EvalSha(ctx, s.scriptSHA, args, argv...).Result()
    	if err != nil {
    		// If the script is not loaded (e.g., Redis restart), load it again.
    		if redis.HasErrorPrefix(err, "NOSCRIPT") {
    			// This is a simplified fallback. In production, you'd want a more robust
    			// script manager to handle this automatically.
    			fmt.Println("Script not found, reloading...")
    			newStore, loadErr := NewIdempotencyStoreLua(ctx, s.client)
    			if loadErr != nil {
    				return nil, loadErr
    			}
    			s.scriptSHA = newStore.scriptSHA
    			// Retry the call
    			res, err = s.client.EvalSha(ctx, s.scriptSHA, args, argv...).Result()
    		}
    		if err != nil {
    			return nil, fmt.Errorf("failed to execute lua script: %w", err)
    		}
    	}
    
    	resultSlice, ok := res.([]interface{})
    	if !ok || len(resultSlice) != 2 {
    		return nil, fmt.Errorf("unexpected lua script result format: %v", res)
    	}
    
    	statusCode, _ := resultSlice[0].(int64)
    
    	switch statusCode {
    	case 1: // Result was cached
    		resultData, _ := resultSlice[1].(string)
    		var response StoredResponse
    		if jsonErr := json.Unmarshal([]byte(resultData), &response); jsonErr != nil {
    			return nil, fmt.Errorf("failed to unmarshal cached response: %w", jsonErr)
    		}
    		fmt.Printf("[IDEM %s] Found cached result via Lua.\n", key)
    		return &response, nil
    	case 2: // Lock acquired
    		fmt.Printf("[IDEM %s] Acquired lock via Lua with worker ID %s.\n", key, workerID)
    		return nil, nil
    	case 3: // Lock held by another worker
    		fmt.Printf("[IDEM %s] Lock held by another worker (detected by Lua).\n", key)
    		return nil, ErrRequestInProgress
    	default:
    		return nil, fmt.Errorf("unknown status code from lua script: %d", statusCode)
    	}
    }
    
    // CompleteRequest remains largely the same, but it's still critical.
    func (s *IdempotencyStoreLua) CompleteRequest(ctx context.Context, key string, response StoredResponse) error {
    	resultData, err := json.Marshal(response)
    	if err != nil {
    		return fmt.Errorf("failed to marshal response: %w", err)
    	}
    
    	// The keys must match the Lua script's logic
    	resultKey := fmt.Sprintf("idem:result:%s", key)
    	startedKey := fmt.Sprintf("idem:started:%s", key)
    
    	pipe := s.client.TxPipeline()
    	pipe.Set(ctx, resultKey, resultData, s.resultTTL)
    	pipe.Del(ctx, startedKey)
    
    	_, err = pipe.Exec(ctx)
    	if err != nil {
    		return fmt.Errorf("redis pipeline failed: %w", err)
    	}
    
    	fmt.Printf("[IDEM %s] Stored result and released lock.\n", key)
    	return nil
    }
    
    // The main function for demonstrating usage would be identical to the previous one,
    // just initializing IdempotencyStoreLua instead.

    Advanced Edge Cases and Production Hardening

    Even with an atomic Lua script, building a truly resilient system requires handling more complex failure scenarios.

    Edge Case 1: The Zombie Worker and Fencing Tokens

    Our lock has a TTL. What happens if a worker acquires a lock, experiences a long GC pause or network partition that exceeds the TTL, and then comes back to life?

  • Worker A acquires lock for key-123 with a 30s TTL.
    • Worker A is paused for 35s.
  • The lock on key-123 expires in Redis.
  • Worker B acquires a new lock for key-123 and starts processing.
  • Worker A wakes up, finishes its work, and calls CompleteRequest, overwriting the result from Worker B.
  • This is a classic distributed systems problem. The solution is a fencing token. The token is a unique, ideally monotonic value associated with each lock acquisition. When we attempt to write the final result, we must provide the fencing token. The storage system then atomically checks if the token is still the current one before allowing the write.

    In our Redis implementation, the unique workerID we generated can serve as a fencing token. We can enhance our CompleteRequest logic to use a Lua script that only sets the result if the lock key still exists and its value matches the worker's ID.

    Enhanced complete.lua script:

    lua
    -- complete.lua
    -- KEYS[1]: The base idempotency key
    -- KEYS[2]: The result key
    -- KEYS[3]: The started/lock key
    -- ARGV[1]: The worker's unique ID (fencing token)
    -- ARGV[2]: The serialized result data
    -- ARGV[3]: The result TTL
    
    local current_owner = redis.call('GET', KEYS[3])
    
    -- Only complete the request if we are still the owner of the lock.
    if current_owner == ARGV[1] then
      redis.call('SET', KEYS[2], ARGV[2], 'EX', ARGV[3])
      redis.call('DEL', KEYS[3])
      return 1 -- Success
    else
      -- We lost the lock. Do not write the result.
      return 0 -- Failure (fenced off)
    end

    The Go CompleteRequest method would be updated to call this script, and it would now be able to detect if it has been fenced off, preventing it from overwriting a result from a newer, legitimate worker.

    Edge Case 2: Storing Large Responses

    Storing full API responses in Redis can consume significant memory, especially if the payloads are large and the result TTL is long. For a high-throughput system, this can become a bottleneck.

    Strategies for Mitigation:

  • Response Hashing: Instead of storing the full response, store a hash (e.g., SHA256) of it. This is only useful if the client can verify the hash, which is uncommon.
  • Pointer to External Storage: Store a pointer (e.g., a URL or an object key) to the full response stored in a more cost-effective blob storage like Amazon S3 or Google Cloud Storage. The idem:result:{key} in Redis would contain {"storage": "s3", "bucket": "my-results", "key": "idem-results/key-123.json"}.
  • Selective Caching: Only cache responses for certain status codes (e.g., 200 OK, 201 Created) and not for transient failures (503 Service Unavailable) that a client might want to legitimately retry later.
  • Performance and Memory Benchmarking

    When deploying this pattern, you must calculate the memory overhead.

    * Memory per Request: (size of started key + size of workerID) + (size of result key + size of serialized response)

    Total Memory: (Avg Requests Per Second lockTTL) size_of_started_key + (Avg Requests Per Second resultTTL) * size_of_result_key

    Example Calculation:

    * Avg 1,000 requests/sec.

    * lockTTL = 30s, resultTTL = 86400s (24h).

    * Avg key size: 40 bytes. Avg worker ID: 36 bytes. Avg result size: 512 bytes.

    Active Locks Memory: 1000 30 * (40 + 36) = 2.28 MB (Negligible)

    Result Memory: 1000 86400 * (40 + 512) = ~44.5 GB (Significant!)

    This calculation immediately highlights that the resultTTL and the size of the stored response are the dominant factors in your Redis memory planning. The pointer-to-external-storage pattern becomes very attractive at this scale.

    Comparing the SETNX vs. Lua approach, benchmarking will consistently show Lua has lower latency under load due to the reduction in network round-trips. For a single request, the difference might be a few milliseconds, but for a system handling thousands of concurrent requests, this adds up to significant gains in throughput and CPU efficiency on the client side.

    Conclusion: A Blueprint for Resilient Systems

    Idempotency is not an optional feature in modern distributed systems; it is a fundamental requirement for correctness. Moving beyond naive implementations that are vulnerable to race conditions is a critical step in engineering mature, reliable services.

    The pattern detailed here—using Redis's atomic capabilities, progressively enhanced with Lua scripting and fencing tokens—provides a robust, high-performance, and production-proven blueprint. It centralizes the complex state management logic, minimizes network latency, and correctly handles the difficult edge cases that plague distributed environments. By investing in a solid idempotency layer, you eliminate a whole class of subtle, dangerous bugs and build systems that are resilient by design.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles