Building a Resilient Idempotency Layer with Redis and Lua Scripts
The Inevitable Problem: Duplicate Processing in Asynchronous Systems
In any mature, event-driven architecture, the contract of message delivery is a critical design choice. Systems like Kafka, RabbitMQ, or AWS SQS typically offer at-least-once delivery guarantees. This is a pragmatic choice for resilience; it's preferable to process a message twice than to lose it entirely. However, this shifts the burden of responsibility to the consumer. The consumer application must be designed to handle duplicate messages without causing incorrect side effects. This property is idempotency.
Consider a payment processing service that consumes charge.create events. If a consumer instance processes an event, successfully charges a credit card, but fails before it can acknowledge the message, the broker will redeliver it to another instance. Without an idempotency check, the customer is charged twice.
A junior engineer's first attempt might look like this:
# WARNING: This implementation is flawed and contains a race condition.
import redis
client = redis.Redis()
def process_payment_naive(event):
idempotency_key = event['idempotency_key']
# 1. CHECK if key exists
if client.get(idempotency_key):
print(f"Request {idempotency_key} already processed.")
return {"status": "duplicate"}
# 2. ACT by setting the key
client.set(idempotency_key, "processed", ex=3600)
# ... perform the actual payment processing ...
result = charge_credit_card(event['details'])
return result
This GET followed by a SET is a classic Check-Then-Act anti-pattern in distributed systems. Imagine two consumers, C1 and C2, receive the same message due to a redelivery. Their execution could be interleaved like this:
client.get(key) -> returns None (key doesn't exist).client.get(key) -> returns None (key still doesn't exist).client.set(key, ...) -> sets the key.charge_credit_card() -> charges the customer.client.set(key, ...) -> overwrites the key.charge_credit_card() -> charges the customer a second time.This race condition is not a theoretical edge case; it is a certainty in a high-throughput, distributed environment. To solve this, we need atomicity.
Pattern 1: The Atomic Lock with `SET NX`
The simplest way to achieve atomicity in Redis is with the SET command's built-in options. The NX option means "set only if the key does not exist." This single, atomic command replaces the flawed GET-then-SET logic.
# A better, but still incomplete, implementation
import redis
import time
client = redis.Redis(decode_responses=True)
# A placeholder for the actual business logic
def charge_credit_card(details):
print(f"Processing charge for {details['amount']}")
time.sleep(2) # Simulate work
return {"status": "success", "transaction_id": "txn_123"}
def process_payment_atomic_lock(event):
idempotency_key = event['idempotency_key']
lock_ttl_seconds = 60 # Should be > max processing time
# Atomically acquire a lock
# SET key value NX PX milliseconds
is_lock_acquired = client.set(idempotency_key, "processing", nx=True, ex=lock_ttl_seconds)
if not is_lock_acquired:
print(f"Request {idempotency_key} is already being processed or is complete.")
# This is a critical flaw: we don't know the actual status.
# Is it in-flight? Did it succeed? Did it fail?
# We can't return the original result.
return {"status": "conflict"}
try:
print(f"Lock acquired for {idempotency_key}. Processing... ")
result = charge_credit_card(event['details'])
# IMPORTANT: After success, update the key to store the result
# and set a longer TTL for retaining the idempotent response.
client.set(idempotency_key, json.dumps(result), ex=86400) # 24-hour retention
return result
except Exception as e:
# On failure, it's often best to release the lock so a retry can occur.
client.delete(idempotency_key)
raise e
Shortcomings of the `SET NX` Pattern
This is a significant improvement, but it's still not robust enough for a production system:
{"status": "conflict"}. The client has no way to retrieve the original successful response. This is a violation of idempotency; a retry should yield the same result as the original call.conflict status is ambiguous. It could mean the original request is still in-flight, or that it has completed. The caller doesn't know whether to wait and retry or to move on.lock_ttl_seconds. This stalls any retries for the duration of the TTL, introducing significant latency.To build a truly resilient system, we need to model the entire lifecycle of the request as a state machine directly within Redis, and we must perform all state transitions atomically. This is the perfect use case for Redis Lua scripts.
Pattern 2: A Stateful Idempotency Machine with Lua
We will model the request state with a simple state machine: (non-existent) -> PENDING -> COMPLETED | FAILED.
The logic for handling this state machine can be complex: check for the key, evaluate its state, check timestamps, and then update it. To prevent race conditions, this entire sequence must be atomic. A Redis Lua script allows us to execute this logic on the Redis server itself, as a single, uninterrupted operation.
The Idempotency Control Lua Script
We'll create two scripts: one to start processing (start_processing) and one to finalize it (complete_processing).
Here is the start_processing.lua script. It's the heart of our idempotency layer.
-- start_processing.lua
-- ARGV[1]: idempotency_key
-- ARGV[2]: pending_ttl (in milliseconds)
-- ARGV[3]: current_timestamp (in milliseconds)
-- ARGV[4]: recovery_timeout (in milliseconds)
local key = KEYS[1]
local pending_ttl = tonumber(ARGV[1])
local current_timestamp = tonumber(ARGV[2])
local recovery_timeout = tonumber(ARGV[3])
local existing_data = redis.call('HGETALL', key)
-- Case 1: Key does not exist. This is a new request.
if #existing_data == 0 then
redis.call('HSET', key, 'status', 'PENDING', 'startTime', current_timestamp)
redis.call('PEXPIRE', key, pending_ttl)
return {'PROCEED'}
end
local status = ''
local start_time = 0
local response = ''
-- Unpack the HGETALL result
for i=1, #existing_data, 2 do
if existing_data[i] == 'status' then
status = existing_data[i+1]
elseif existing_data[i] == 'startTime' then
start_time = tonumber(existing_data[i+1])
elseif existing_data[i] == 'response' then
response = existing_data[i+1]
end
end
-- Case 2: Key exists and is COMPLETED. Return the stored response.
if status == 'COMPLETED' then
return {'COMPLETED', response}
end
-- Case 3: Key exists and is PENDING. This is a concurrent request or a retry on a slow/stuck job.
if status == 'PENDING' then
-- Check if the existing PENDING lock has timed out (i.e., worker crashed).
if (current_timestamp - start_time) > recovery_timeout then
-- The previous worker is considered dead. Take over.
redis.call('HSET', key, 'status', 'PENDING', 'startTime', current_timestamp)
redis.call('PEXPIRE', key, pending_ttl)
return {'PROCEED_RECOVERY'}
else
-- The original request is still being processed within its valid time window.
return {'CONFLICT'}
end
end
-- Default case for any other status (e.g., FAILED), treat as a conflict for this simple model.
return {'CONFLICT'}
And the corresponding complete_processing.lua script:
-- complete_processing.lua
-- KEYS[1]: idempotency_key
-- ARGV[1]: response_data (JSON string)
-- ARGV[2]: completed_ttl (in milliseconds)
local key = KEYS[1]
local response_data = ARGV[1]
local completed_ttl = tonumber(ARGV[2])
-- Only update if the key exists (i.e., we hold the lock)
if redis.call('EXISTS', key) == 1 then
redis.call('HSET', key, 'status', 'COMPLETED', 'response', response_data)
redis.call('HDEL', key, 'startTime') -- Clean up start time
redis.call('PEXPIRE', key, completed_ttl)
return 1
else
-- This could happen if our PENDING lock expired before we could complete.
return 0
end
The Python Implementation
Now, let's integrate these scripts into a Python service. The service will load the scripts into Redis once on startup and then call them using EVALSHA for maximum performance.
import redis
import json
import time
import hashlib
from functools import wraps
class IdempotencyService:
def __init__(self, redis_client: redis.Redis):
self.client = redis_client
self.scripts = {}
self._load_scripts()
def _load_scripts(self):
script_files = {
'start': 'start_processing.lua',
'complete': 'complete_processing.lua'
}
for name, filename in script_files.items():
with open(filename, 'r') as f:
script_text = f.read()
self.scripts[name] = self.client.script_load(script_text)
def start_processing(self, key, pending_ttl_ms, recovery_timeout_ms):
current_timestamp_ms = int(time.time() * 1000)
sha = self.scripts['start']
# EVALSHA key_count key_name ... arg ...
result = self.client.evalsha(
sha, 1, key,
pending_ttl_ms, current_timestamp_ms, recovery_timeout_ms
)
return result
def complete_processing(self, key, response_data, completed_ttl_ms):
sha = self.scripts['complete']
response_json = json.dumps(response_data)
return self.client.evalsha(
sha, 1, key,
response_json, completed_ttl_ms
)
def delete_key(self, key):
self.client.delete(key)
# --- Application Logic ---
REDIS_CLIENT = redis.Redis(decode_responses=True)
IDEMPOTENCY_SERVICE = IdempotencyService(REDIS_CLIENT)
# Configuration
PENDING_TTL_MS = 10_000 # 10 seconds: max expected processing time
RECOVERY_TIMEOUT_MS = 5_000 # 5 seconds: if PENDING for this long, assume worker crash
COMPLETED_TTL_MS = 86_400_000 # 24 hours: how long to store final results
# A placeholder for the actual business logic
def charge_credit_card(details):
print(f"[WORKER] Processing charge for {details['amount']}")
# Simulate work that might fail or succeed
if details.get("force_fail", False):
time.sleep(1)
raise ValueError("Credit card declined")
time.sleep(2)
return {"status": "success", "transaction_id": f"txn_{int(time.time())}"}
def process_payment_request(event):
payload_str = json.dumps(event, sort_keys=True)
idempotency_key = f"idem:{hashlib.sha256(payload_str.encode()).hexdigest()}"
print(f"\n[HANDLER] Received request with key: {idempotency_key[:20]}...")
# 1. Start Idempotent Processing
result = IDEMPOTENCY_SERVICE.start_processing(idempotency_key, PENDING_TTL_MS, RECOVERY_TIMEOUT_MS)
status = result[0]
print(f"[IDEMPOTENCY] Start result: {status}")
# 2. Handle the outcome
if status == 'COMPLETED':
print("[HANDLER] Request already completed. Returning stored response.")
return json.loads(result[1])
if status == 'CONFLICT':
print("[HANDLER] Request is already in progress. Rejecting.")
return {"status": "error", "message": "Request already in progress"}, 409
if status in ['PROCEED', 'PROCEED_RECOVERY']:
if status == 'PROCEED_RECOVERY':
print("[HANDLER] Recovering a stale PENDING lock. Proceeding.")
try:
# 3. Execute Business Logic
business_result = charge_credit_card(event['details'])
# 4. Mark as Completed
IDEMPOTENCY_SERVICE.complete_processing(idempotency_key, business_result, COMPLETED_TTL_MS)
print(f"[IDEMPOTENCY] Marked {idempotency_key[:20]}... as COMPLETED.")
return business_result, 200
except Exception as e:
print(f"[HANDLER] Business logic failed: {e}")
# On failure, we delete the key to allow a full retry.
# A more advanced pattern might set a FAILED status with an expiry.
IDEMPOTENCY_SERVICE.delete_key(idempotency_key)
print(f"[IDEMPOTENCY] Deleted key {idempotency_key[:20]}... to allow retry.")
return {"status": "error", "message": str(e)}, 500
# --- Simulation ---
if __name__ == '__main__':
payment_event = {
"event_id": "evt_abc123",
"details": {
"amount": 100.00,
"currency": "USD",
"user_id": "usr_xyz789"
}
}
print("--- First Call ---")
res1, code1 = process_payment_request(payment_event)
print(f"Response 1: {res1} (Code: {code1})")
print("\n--- Second Call (Retry) ---")
res2, code2 = process_payment_request(payment_event)
print(f"Response 2: {res2} (Code: {code2})")
# Verify the responses are identical
assert res1['transaction_id'] == res2['transaction_id']
print("\nIdempotency check passed: Transaction IDs are identical.")
To run this, you need two files: start_processing.lua and complete_processing.lua in the same directory.
Analysis of the Lua-based Solution
This pattern elegantly solves the shortcomings of the simple SET NX approach:
start_processing script immediately identifies the COMPLETED state and returns the stored response. The client receives the exact same successful result, fulfilling the promise of idempotency.PROCEED, COMPLETED, CONFLICT) is unambiguous. The application logic knows exactly how to act.recovery_timeout is a crucial mechanism. If a worker grabs a lock and dies, its PENDING key will eventually be considered stale (current_timestamp - start_time > recovery_timeout). The next worker to attempt the operation will be granted a PROCEED_RECOVERY status and can safely take over, preventing stuck jobs.Advanced Considerations and Production Hardening
Deploying this pattern in a large-scale system requires attention to several details.
Idempotency Key Generation Strategy
The robustness of the entire system hinges on the quality of the idempotency key. A poor key can lead to unintentional collisions or missed duplicates.
Idempotency-Key. This is excellent because it puts the onus of uniqueness on the initiator of the action.timestamp or message_id headers added by the broker.Our example uses hashlib.sha256(json.dumps(event, sort_keys=True).encode()). This is a good starting point, ensuring that any change in the payload, including key order, results in a different key.
TTL Management and Memory Pressure
Choosing the right TTLs is a balancing act between safety, correctness, and resource consumption.
pending_ttl: This is a safety net. It should be set to a value slightly longer than the absolute maximum time your business logic should ever take (your p99.9 latency + a buffer). If a worker holds a lock for longer, it's assumed to be dead, and the lock is released. Too short, and you risk other workers prematurely taking over a slow but valid operation. Too long, and a genuine crash causes a long processing delay.completed_ttl: This defines your idempotency window. How long should a client be able to retry and get the same response? For payment APIs, 24 hours is common. For less critical operations, a few minutes might suffice. A longer TTL provides more safety but consumes more memory in Redis, as you are storing the full response payload for every unique request.Memory Calculation:
Estimate your memory footprint. If you process 1,000 unique requests per second, have an average response size of 2 KB, and a completed_ttl of 1 hour (3600s):
1000 req/s 3600 s (avg_key_size + avg_response_size) = 3,600,000 keys * (64 bytes + 2048 bytes) ≈ 7.6 GB
This is a non-trivial amount of memory. For extremely high-throughput systems, you might need to shorten the TTL, sample which requests get idempotency, or use a different storage solution for the response payloads.
Performance Implications of Lua Scripts
Redis is single-threaded. A long-running Lua script will block all other commands on that Redis instance. This is why the logic inside our scripts is intentionally simple: it involves a few hash lookups, comparisons, and writes. There are no loops or computationally expensive operations.
Always benchmark your scripts under load. Use the SCRIPT DEBUG command or monitor slowlog to ensure they execute in microseconds. For this pattern, the network round-trip time will almost always be the dominant factor, not the script execution time.
Using EVALSHA is critical. It avoids sending the full script text over the network for every call. The client sends the SHA1 hash of the script, and Redis executes the cached version. The redis-py library's script_load mechanism handles this automatically.
Behavior in a Redis Cluster Environment
This pattern works seamlessly with Redis Cluster because all operations for a given idempotency check revolve around a single key. Redis Cluster hashes the key name to determine which slot (and therefore which shard) it belongs to. As long as our Lua script only accesses the single key passed in KEYS[1], Redis Cluster can route the command to the correct node and execute it without issue. Cross-key operations in Lua scripts are more complex in a clustered environment, but this pattern does not require them.