Atomic Idempotency Patterns in Event-Driven Systems with Redis & Lua
The Inescapable Problem: Duplicate Processing in Asynchronous Systems
In any distributed, asynchronous architecture, the guarantee of message delivery is often "at-least-once." Network partitions, service restarts, and client-side retry logic mean your message consumers or API endpoints will inevitably process the same logical request multiple times. For non-mutating operations, this is benign. For critical business logic—payment processing, order creation, inventory updates—it's a catastrophic failure mode leading to data corruption, financial loss, and customer distrust.
The standard solution is idempotency: designing an operation so that receiving it multiple times has the same effect as receiving it once. This is typically achieved with an idempotency key, a unique client-generated identifier for each distinct operation. The server stores the result of the first successful operation against this key and simply returns the cached result for any subsequent retries.
This sounds simple, but the implementation is fraught with peril. The most critical failure point is the non-atomic nature of the typical "check-then-act" logic in a high-concurrency environment. A naive implementation is a ticking time bomb.
Why Naive Implementations Fail: The Race Condition
Consider a simple flow using a relational database or a basic key-value store:
Idempotency-Key: key-123.idempotency_keys table/store: SELECT result FROM idempotency_keys WHERE key = 'key-123'.INSERT INTO idempotency_keys (key, result) VALUES ('key-123', '...').Now, imagine two requests with the same key arriving milliseconds apart, processed by two different workers (or threads):
sequenceDiagram
participant C as Client
participant W1 as Worker 1
participant W2 as Worker 2
participant DB as Database
C->>W1: POST /charge (key: key-123)
C->>W2: POST /charge (key: key-123) [Retry]
W1->>DB: SELECT result WHERE key='key-123'
DB-->>W1: (Not Found)
W2->>DB: SELECT result WHERE key='key-123'
DB-->>W2: (Not Found)
note over W1, W2: Both workers believe they are the first!
W1->>DB: Execute Payment Logic...
W2->>DB: Execute Payment Logic... (!!)
W1->>DB: INSERT result for 'key-123'
DB-->>W1: OK
W2->>DB: INSERT result for 'key-123'
DB-->>W2: Fails (Unique Key Violation)
note over W2: The operation has already run twice.
Even with a unique constraint on the key, the business logic has already executed twice. The database constraint only prevents storing the duplicate result; it doesn't prevent the duplicate action. This is the fundamental race condition we must eliminate.
The Atomic Primitive: Redis `SETNX`
To solve the race condition, the "check-and-set" operation must be atomic. This is where Redis excels. The SET key value NX command (SETNX is a legacy alias) sets a key only if it does not already exist. This single, atomic operation forms the foundation of a distributed lock.
We can build a more robust, two-phase idempotency check around this primitive. We need to manage three states for a given key:
Let's design a flow using two keys in Redis for each idempotency key:
* idem:started:{key}: A lock key to signal that processing has begun.
* idem:result:{key}: A key to store the final serialized result (success or failure).
Here is the refined, SETNX-based logic:
Idempotency-Key: key-123.GET idem:result:key-123. If it exists, return the cached result immediately.SET idem:started:key-123 NX EX . The EX sets an expiration to prevent indefinite locks if a worker crashes.SET returns OK (lock acquired):a. Execute the business logic.
b. Upon completion, store the result: SET idem:result:key-123 .
c. Release the lock: DEL idem:started:key-123.
SET returns nil (lock is held by another worker):a. The request is already being processed. The client should wait and retry.
b. The client can poll the idem:result:key-123 key until a result is available.
Go Implementation with `go-redis`
Here's a production-grade implementation of this pattern in Go. This IdempotencyStore encapsulates the logic.
// main.go
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)
// Represents the stored result of an idempotent operation.
// Can store either a success payload or an error.
type StoredResponse struct {
StatusCode int `json:"statusCode"`
Body string `json:"body"`
Error string `json:"error,omitempty"`
}
// Custom errors for idempotency logic
var (
ErrRequestInProgress = errors.New("request in progress")
)
// IdempotencyStore manages the lifecycle of idempotent requests using Redis.
type IdempotencyStore struct {
client *redis.Client
lockTTL time.Duration // How long the processing lock should be held
resultTTL time.Duration // How long the final result should be stored
}
func NewIdempotencyStore(client *redis.Client) *IdempotencyStore {
return &IdempotencyStore{
client: client,
lockTTL: 30 * time.Second, // A sensible default lock time
resultTTL: 24 * time.Hour, // Store results for a full day
}
}
func (s *IdempotencyStore) startedKey(key string) string {
return fmt.Sprintf("idem:started:%s", key)
}
func (s *IdempotencyStore) resultKey(key string) string {
return fmt.Sprintf("idem:result:%s", key)
}
// BeginRequest attempts to start processing for an idempotency key.
// It returns a previously stored response if one exists.
// It returns ErrRequestInProgress if another worker holds the lock.
// It returns nil if the lock was successfully acquired.
func (s *IdempotencyStore) BeginRequest(ctx context.Context, key string) (*StoredResponse, error) {
// 1. Check for a final result first.
resultData, err := s.client.Get(ctx, s.resultKey(key)).Bytes()
if err == nil {
var response StoredResponse
if jsonErr := json.Unmarshal(resultData, &response); jsonErr == nil {
fmt.Printf("[IDEM %s] Found cached result.\n", key)
return &response, nil
}
// Handle case where data is corrupted, proceed as if not found
}
if err != redis.Nil {
return nil, fmt.Errorf("redis GET failed: %w", err) // Genuine Redis error
}
// 2. Attempt to acquire the lock.
workerID := uuid.NewString()
wasSet, err := s.client.SetNX(ctx, s.startedKey(key), workerID, s.lockTTL).Result()
if err != nil {
return nil, fmt.Errorf("redis SETNX failed: %w", err)
}
if !wasSet {
fmt.Printf("[IDEM %s] Lock held by another worker.\n", key)
return nil, ErrRequestInProgress
}
fmt.Printf("[IDEM %s] Acquired lock with worker ID %s.\n", key, workerID)
return nil, nil // Lock acquired, ready to process
}
// CompleteRequest stores the final result of an operation and releases the lock.
func (s *IdempotencyStore) CompleteRequest(ctx context.Context, key string, response StoredResponse) error {
resultData, err := json.Marshal(response)
if err != nil {
return fmt.Errorf("failed to marshal response: %w", err)
}
// Use a pipeline to ensure atomicity of setting result and deleting lock
pipe := s.client.TxPipeline()
pipe.Set(ctx, s.resultKey(key), resultData, s.resultTTL)
pipe.Del(ctx, s.startedKey(key))
_, err = pipe.Exec(ctx)
if err != nil {
return fmt.Errorf("redis pipeline failed: %w", err)
}
fmt.Printf("[IDEM %s] Stored result and released lock.\n", key)
return nil
}
// --- Example Usage ---
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
store := NewIdempotencyStore(rdb)
idempotencyKey := "order-45678"
// Simulate a business operation
processPayment := func(key string) (*StoredResponse, error) {
// This is where your core business logic lives
fmt.Printf("[BIZ %s] Processing payment...\n", key)
time.Sleep(2 * time.Second) // Simulate work
fmt.Printf("[BIZ %s] Payment successful.\n", key)
return &StoredResponse{StatusCode: 200, Body: `{"transactionId": "txn_abc123"}`}, nil
}
handleRequest := func(key string) {
// Begin the idempotent request
cachedResponse, err := store.BeginRequest(ctx, key)
if err != nil {
if err == ErrRequestInProgress {
fmt.Printf("[HANDLER %s] Request is already in progress. Will not re-process.\n", key)
return
}
fmt.Printf("[HANDLER %s] Error starting request: %v\n", key, err)
return
}
if cachedResponse != nil {
fmt.Printf("[HANDLER %s] Returning cached response: %+v\n", key, *cachedResponse)
return
}
// Lock acquired, execute the core logic
response, bizErr := processPayment(key)
if bizErr != nil {
// Store the failure result
failedResponse := StoredResponse{StatusCode: 500, Error: bizErr.Error()}
store.CompleteRequest(ctx, key, failedResponse)
return
}
// Store the success result
store.CompleteRequest(ctx, key, *response)
}
// Simulate two concurrent requests
fmt.Println("--- Simulating concurrent requests ---")
go handleRequest(idempotencyKey)
go handleRequest(idempotencyKey)
time.Sleep(5 * time.Second)
// Simulate a third request after the first has completed
fmt.Println("\n--- Simulating a later request ---")
handleRequest(idempotencyKey)
time.Sleep(2 * time.Second)
}
This SETNX approach is a significant improvement. It's atomic at the point of decision, preventing the race condition. However, it's not perfect. There are subtle edge cases and opportunities for optimization that a truly robust system must address.
The Ultimate Solution: A State Machine in Lua
The SETNX pattern requires multiple network round-trips to Redis: GET result, SETNX lock, then a SET/DEL pipeline. Each round-trip adds latency. More critically, the logic is spread across the client application and Redis. A more powerful approach is to encapsulate the entire state transition logic within Redis itself using a Lua script.
Redis guarantees the atomic execution of Lua scripts. No other command can run while a script is executing. This allows us to create a single, atomic operation that handles all the initial state checks and lock acquisition.
Our Lua script will implement a complete state machine for an idempotency key:
* Inputs: KEYS[1] (idempotency key), ARGV[1] (lock owner ID), ARGV[2] (lock TTL), ARGV[3] (result TTL).
* Outputs: A status code and an optional payload.
* {1, result}: Result was cached.
* {2, nil}: Lock acquired successfully.
* {3, nil}: Lock is held by another worker.
The Lua Script (`idempotency.lua`)
-- idempotency.lua
-- KEYS[1]: The base idempotency key (e.g., 'order-45678')
-- ARGV[1]: A unique ID for the worker acquiring the lock (e.g., a UUID)
-- ARGV[2]: The TTL for the lock in seconds
-- ARGV[3]: The TTL for the final result in seconds
local result_key = 'idem:result:' .. KEYS[1]
local started_key = 'idem:started:' .. KEYS[1]
-- 1. Check for a finished result first.
local result = redis.call('GET', result_key)
if result then
-- Operation is already complete, return the cached result.
return {1, result}
end
-- 2. Check if the operation is already in progress.
local owner = redis.call('GET', started_key)
if owner then
-- Lock is held. Return status 3 to indicate it's in progress.
return {3, nil}
end
-- 3. No result and no lock. Attempt to acquire the lock.
redis.call('SET', started_key, ARGV[1], 'EX', ARGV[2])
-- Return status 2 to indicate lock was successfully acquired.
return {2, nil}
This script is significantly more efficient. It performs all checks and the lock acquisition in a single, atomic server-side operation.
Integrating the Lua Script in Go
We now refactor our IdempotencyStore to use this script. We'll load the script into Redis once using SCRIPT LOAD and then call it via its SHA1 hash using EVALSHA for maximum efficiency.
// main_lua.go
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)
// StoredResponse and errors are the same as before...
// IdempotencyStore with Lua script support
type IdempotencyStoreLua struct {
client *redis.Client
scriptSHA string
lockTTL time.Duration
resultTTL time.Duration
}
func NewIdempotencyStoreLua(ctx context.Context, client *redis.Client) (*IdempotencyStoreLua, error) {
luaScript, err := os.ReadFile("idempotency.lua")
if err != nil {
return nil, fmt.Errorf("failed to read lua script: %w", err)
}
sha, err := client.ScriptLoad(ctx, string(luaScript)).Result()
if err != nil {
return nil, fmt.Errorf("failed to load lua script: %w", err)
}
fmt.Printf("Loaded idempotency Lua script with SHA: %s\n", sha)
return &IdempotencyStoreLua{
client: client,
scriptSHA: sha,
lockTTL: 30 * time.Second,
resultTTL: 24 * time.Hour,
}, nil
}
// BeginRequest now uses the atomic Lua script.
func (s *IdempotencyStoreLua) BeginRequest(ctx context.Context, key string) (*StoredResponse, error) {
workerID := uuid.NewString()
args := []interface{}{key}
argv := []interface{}{workerID, s.lockTTL.Seconds(), s.resultTTL.Seconds()}
res, err := s.client.EvalSha(ctx, s.scriptSHA, args, argv...).Result()
if err != nil {
// If the script is not loaded (e.g., Redis restart), load it again.
if redis.HasErrorPrefix(err, "NOSCRIPT") {
// This is a simplified fallback. In production, you'd want a more robust
// script manager to handle this automatically.
fmt.Println("Script not found, reloading...")
newStore, loadErr := NewIdempotencyStoreLua(ctx, s.client)
if loadErr != nil {
return nil, loadErr
}
s.scriptSHA = newStore.scriptSHA
// Retry the call
res, err = s.client.EvalSha(ctx, s.scriptSHA, args, argv...).Result()
}
if err != nil {
return nil, fmt.Errorf("failed to execute lua script: %w", err)
}
}
resultSlice, ok := res.([]interface{})
if !ok || len(resultSlice) != 2 {
return nil, fmt.Errorf("unexpected lua script result format: %v", res)
}
statusCode, _ := resultSlice[0].(int64)
switch statusCode {
case 1: // Result was cached
resultData, _ := resultSlice[1].(string)
var response StoredResponse
if jsonErr := json.Unmarshal([]byte(resultData), &response); jsonErr != nil {
return nil, fmt.Errorf("failed to unmarshal cached response: %w", jsonErr)
}
fmt.Printf("[IDEM %s] Found cached result via Lua.\n", key)
return &response, nil
case 2: // Lock acquired
fmt.Printf("[IDEM %s] Acquired lock via Lua with worker ID %s.\n", key, workerID)
return nil, nil
case 3: // Lock held by another worker
fmt.Printf("[IDEM %s] Lock held by another worker (detected by Lua).\n", key)
return nil, ErrRequestInProgress
default:
return nil, fmt.Errorf("unknown status code from lua script: %d", statusCode)
}
}
// CompleteRequest remains largely the same, but it's still critical.
func (s *IdempotencyStoreLua) CompleteRequest(ctx context.Context, key string, response StoredResponse) error {
resultData, err := json.Marshal(response)
if err != nil {
return fmt.Errorf("failed to marshal response: %w", err)
}
// The keys must match the Lua script's logic
resultKey := fmt.Sprintf("idem:result:%s", key)
startedKey := fmt.Sprintf("idem:started:%s", key)
pipe := s.client.TxPipeline()
pipe.Set(ctx, resultKey, resultData, s.resultTTL)
pipe.Del(ctx, startedKey)
_, err = pipe.Exec(ctx)
if err != nil {
return fmt.Errorf("redis pipeline failed: %w", err)
}
fmt.Printf("[IDEM %s] Stored result and released lock.\n", key)
return nil
}
// The main function for demonstrating usage would be identical to the previous one,
// just initializing IdempotencyStoreLua instead.
Advanced Edge Cases and Production Hardening
Even with an atomic Lua script, building a truly resilient system requires handling more complex failure scenarios.
Edge Case 1: The Zombie Worker and Fencing Tokens
Our lock has a TTL. What happens if a worker acquires a lock, experiences a long GC pause or network partition that exceeds the TTL, and then comes back to life?
key-123 with a 30s TTL.- Worker A is paused for 35s.
key-123 expires in Redis.key-123 and starts processing.CompleteRequest, overwriting the result from Worker B.This is a classic distributed systems problem. The solution is a fencing token. The token is a unique, ideally monotonic value associated with each lock acquisition. When we attempt to write the final result, we must provide the fencing token. The storage system then atomically checks if the token is still the current one before allowing the write.
In our Redis implementation, the unique workerID we generated can serve as a fencing token. We can enhance our CompleteRequest logic to use a Lua script that only sets the result if the lock key still exists and its value matches the worker's ID.
Enhanced complete.lua script:
-- complete.lua
-- KEYS[1]: The base idempotency key
-- KEYS[2]: The result key
-- KEYS[3]: The started/lock key
-- ARGV[1]: The worker's unique ID (fencing token)
-- ARGV[2]: The serialized result data
-- ARGV[3]: The result TTL
local current_owner = redis.call('GET', KEYS[3])
-- Only complete the request if we are still the owner of the lock.
if current_owner == ARGV[1] then
redis.call('SET', KEYS[2], ARGV[2], 'EX', ARGV[3])
redis.call('DEL', KEYS[3])
return 1 -- Success
else
-- We lost the lock. Do not write the result.
return 0 -- Failure (fenced off)
end
The Go CompleteRequest method would be updated to call this script, and it would now be able to detect if it has been fenced off, preventing it from overwriting a result from a newer, legitimate worker.
Edge Case 2: Storing Large Responses
Storing full API responses in Redis can consume significant memory, especially if the payloads are large and the result TTL is long. For a high-throughput system, this can become a bottleneck.
Strategies for Mitigation:
idem:result:{key} in Redis would contain {"storage": "s3", "bucket": "my-results", "key": "idem-results/key-123.json"}.200 OK, 201 Created) and not for transient failures (503 Service Unavailable) that a client might want to legitimately retry later.Performance and Memory Benchmarking
When deploying this pattern, you must calculate the memory overhead.
* Memory per Request: (size of started key + size of workerID) + (size of result key + size of serialized response)
Total Memory: (Avg Requests Per Second lockTTL) size_of_started_key + (Avg Requests Per Second resultTTL) * size_of_result_key
Example Calculation:
* Avg 1,000 requests/sec.
* lockTTL = 30s, resultTTL = 86400s (24h).
* Avg key size: 40 bytes. Avg worker ID: 36 bytes. Avg result size: 512 bytes.
Active Locks Memory: 1000 30 * (40 + 36) = 2.28 MB (Negligible)
Result Memory: 1000 86400 * (40 + 512) = ~44.5 GB (Significant!)
This calculation immediately highlights that the resultTTL and the size of the stored response are the dominant factors in your Redis memory planning. The pointer-to-external-storage pattern becomes very attractive at this scale.
Comparing the SETNX vs. Lua approach, benchmarking will consistently show Lua has lower latency under load due to the reduction in network round-trips. For a single request, the difference might be a few milliseconds, but for a system handling thousands of concurrent requests, this adds up to significant gains in throughput and CPU efficiency on the client side.
Conclusion: A Blueprint for Resilient Systems
Idempotency is not an optional feature in modern distributed systems; it is a fundamental requirement for correctness. Moving beyond naive implementations that are vulnerable to race conditions is a critical step in engineering mature, reliable services.
The pattern detailed here—using Redis's atomic capabilities, progressively enhanced with Lua scripting and fencing tokens—provides a robust, high-performance, and production-proven blueprint. It centralizes the complex state management logic, minimizes network latency, and correctly handles the difficult edge cases that plague distributed environments. By investing in a solid idempotency layer, you eliminate a whole class of subtle, dangerous bugs and build systems that are resilient by design.