Idempotency Middleware for Asynchronous Event Consumers in Go
The Inevitability of Duplicates in Distributed Systems
In any non-trivial event-driven architecture, the contract of message delivery is almost always at-least-once. Guarantees of exactly-once delivery are notoriously difficult, often impossible, to achieve across heterogeneous systems without significant performance trade-offs or specialized infrastructure. This reality stems from a simple truth: the consumer must acknowledge a message after processing it. If a consumer processes a message but crashes before it can send the ack, the message broker will assume it was never processed and redeliver it to another consumer instance. This leads to duplicate processing.
Consider a payment service that consumes an OrderCreated event:
// Simplified business logic
func ProcessPayment(ctx context.Context, order Order) error {
// 1. Charge the customer's credit card via a third-party API
err := paymentGateway.Charge(order.CustomerID, order.Amount)
if err != nil {
return err // The ack will not be sent, message will be redelivered
}
// 2. Update the order status in our database
err = db.UpdateOrderStatus(order.ID, "PAID")
if err != nil {
return err // CRASH! or DB timeout. Ack not sent.
}
// 3. Acknowledge the message to the broker (e.g., Kafka commit offset)
// This step is never reached if the service crashes after the charge.
return nil
}
If the service crashes between the successful charge and the database update, the message is redelivered. The result? The customer is charged twice. This is not a theoretical edge case; it's a guaranteed failure mode in any system operating at scale. The only robust solution is to make the consumer's operation idempotent: processing the same message multiple times must have the same effect as processing it once.
This article details the design and implementation of a generic idempotency middleware in Go, using Redis as a high-performance backend for tracking and locking. We will build a solution that is decoupled from business logic and resilient to production complexities.
The Idempotency Key Pattern
The foundation of our solution is the idempotency key. This is a unique identifier for a specific operation instance. The producer of the event is responsible for generating this key and including it in the message payload or headers.
A good idempotency key must be unique for each distinct operation but identical for retries of the same operation. A UUIDv4 is a common choice.
// Example event payload from a Kafka topic
{
"event_id": "evt_a1b2c3d4",
"event_type": "order.created",
"idempotency_key": "a7b1c3d8-e1f2-4a5b-8c9d-0e1f2a3b4c5d",
"payload": {
"order_id": "ord_12345",
"customer_id": "cust_67890",
"amount": 9999,
"currency": "USD"
}
}
Our middleware will use this key to track the processing state of an event. The state machine for an idempotency key is simple but critical:
Architecting the Middleware in Go
We will structure our solution as an HTTP-style middleware, a common pattern in Go. This allows us to wrap any Handler function with our idempotency logic without polluting the core business code.
Our IdempotencyMiddleware will orchestrate the entire process:
- Extract the idempotency key from the incoming event.
- Atomically check and set the key's status in Redis.
COMPLETED, we short-circuit and acknowledge the message immediately.IN_PROGRESS, another consumer is working on it. We'll treat this as a transient error and nack the message for later redelivery.NOT_FOUND, we mark it as IN_PROGRESS with a timeout (a lock), execute the business logic, and finally update the key's status to COMPLETED upon success.Here's the Go interface we'll build around:
package idempotency
import "context"
// Handler defines the function signature for the actual business logic.
type Handler func(ctx context.Context, key string, payload []byte) error
// Middleware is a function that wraps a Handler to add idempotency.
type Middleware func(next Handler) Handler
// Store defines the interface for our persistence layer (e.g., Redis).
type Store interface {
// Lock attempts to acquire a lock for the given key.
// It should return ErrLockExists if the key is already locked.
Lock(ctx context.Context, key string) error
// Unlock releases the lock for a given key.
Unlock(ctx context.Context, key string) error
// CheckCompleted checks if a key has already been successfully processed.
CheckCompleted(ctx context.Context, key string) (bool, error)
// MarkCompleted marks a key as successfully processed.
MarkCompleted(ctx context.Context, key string) error
}
Implementation Deep Dive: A Redis-backed Store
Redis is an excellent choice for an idempotency store due to its high performance, atomic operations, and support for key expiry (TTL).
Let's implement the Store interface using the go-redis library. We'll use two distinct key prefixes: one for the lock (idem:lock:) and one for the completion marker (idem:done:).
package idempotency
import (
"context"
"errors"
"time"
"github.com/go-redis/redis/v8"
)
var (
ErrLockExists = errors.New("lock already exists")
)
// RedisStore implements the Store interface using Redis.
type RedisStore struct {
client *redis.Client
lockTTL time.Duration // How long the IN_PROGRESS lock should last
completedTTL time.Duration // How long the COMPLETED marker should last
}
// NewRedisStore creates a new RedisStore instance.
func NewRedisStore(client *redis.Client, lockTTL, completedTTL time.Duration) *RedisStore {
return &RedisStore{
client: client,
lockTTL: lockTTL,
completedTTL: completedTTL,
}
}
func (s *RedisStore) lockKey(key string) string {
return "idem:lock:" + key
}
func (s *RedisStore) completedKey(key string) string {
return "idem:done:" + key
}
// Lock uses `SET NX` for an atomic lock acquisition.
func (s *RedisStore) Lock(ctx context.Context, key string) error {
// The 'true' value is arbitrary, we only care about the key's existence.
ok, err := s.client.SetNX(ctx, s.lockKey(key), true, s.lockTTL).Result()
if err != nil {
return err
}
if !ok {
return ErrLockExists
}
return nil
}
// Unlock simply deletes the lock key.
func (s *RedisStore) Unlock(ctx context.Context, key string) error {
return s.client.Del(ctx, s.lockKey(key)).Err()
}
// CheckCompleted checks for the existence of the completion marker.
func (s *RedisStore) CheckCompleted(ctx context.Context, key string) (bool, error) {
val, err := s.client.Exists(ctx, s.completedKey(key)).Result()
if err != nil {
return false, err
}
return val > 0, nil
}
// MarkCompleted sets the completion marker with a TTL.
func (s *RedisStore) MarkCompleted(ctx context.Context, key string) error {
return s.client.Set(ctx, s.completedKey(key), true, s.completedTTL).Err()
}
The Middleware Implementation
Now we can wire this RedisStore into our middleware factory. This factory will return a Middleware function that can be applied to any handler.
package idempotency
import (
"context"
"log"
)
// NewIdempotencyMiddleware creates the middleware function.
func NewIdempotencyMiddleware(store Store) Middleware {
return func(next Handler) Handler {
return func(ctx context.Context, key string, payload []byte) error {
// 1. Check if already completed
completed, err := store.CheckCompleted(ctx, key)
if err != nil {
log.Printf("ERROR: Failed to check completion for key %s: %v", key, err)
// Treat this as a transient failure, nack the message to retry later.
return err
}
if completed {
log.Printf("INFO: Idempotency key %s already processed, skipping.", key)
// Acknowledge the message, as it's a successful duplicate.
return nil
}
// 2. Attempt to acquire a lock
if err := store.Lock(ctx, key); err != nil {
if err == ErrLockExists {
log.Printf("WARN: Idempotency key %s is already being processed, nacking.", key)
// Nack the message to retry after the lock TTL expires.
return err
}
log.Printf("ERROR: Failed to acquire lock for key %s: %v", key, err)
return err
}
// Ensure the lock is released if something goes wrong before completion.
defer store.Unlock(ctx, key)
// 3. Execute the business logic
log.Printf("INFO: Acquired lock for key %s, processing...", key)
handlerErr := next(ctx, key, payload)
// 4. Handle the result
if handlerErr != nil {
log.Printf("ERROR: Business logic failed for key %s: %v", key, handlerErr)
// The lock will be released by the defer. We do NOT mark as completed.
// The message will be redelivered for a clean retry.
return handlerErr
}
// 5. Mark as completed
if err := store.MarkCompleted(ctx, key); err != nil {
log.Printf("CRITICAL: Business logic succeeded for key %s but failed to mark as completed: %v", key, err)
// This is a critical failure state. The lock will be released,
// and the message will be reprocessed, but we've lost idempotency.
// We'll address this in the 'Advanced Edge Cases' section.
return err
}
log.Printf("INFO: Successfully processed key %s, marked as completed.", key)
// The lock can be released now that the completed flag is set.
// The defer will handle this.
return nil
}
}
}
Putting It All Together: A Consumer Example
Here's how a Kafka consumer would use this middleware.
// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
"your-project/idempotency"
)
// Represents the message from Kafka
type KafkaMessage struct {
IdempotencyKey string `json:"idempotency_key"`
Payload json.RawMessage `json:"payload"`
}
// The actual business logic
func paymentHandler(ctx context.Context, key string, payload []byte) error {
fmt.Printf("--- Executing payment logic for key: %s ---\n", key)
// Simulate processing time
time.Sleep(200 * time.Millisecond)
// In a real app, this would charge a card, update a DB, etc.
fmt.Printf("--- Payment logic complete for key: %s ---\n", key)
return nil
}
func main() {
// --- Setup ---
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// Lock should be long enough for the operation to complete.
// Completed marker should be long enough to outlast any message redelivery delay.
store := idempotency.NewRedisStore(redisClient, 1*time.Minute, 24*time.Hour)
idempotencyMiddleware := idempotency.NewIdempotencyMiddleware(store)
// Wrap the business logic with the middleware
protectedHandler := idempotencyMiddleware(paymentHandler)
// --- Simulate receiving messages from Kafka ---
ctx := context.Background()
messages := []KafkaMessage{
{IdempotencyKey: "key-1", Payload: []byte(`{"order_id": "123"}`)}, // First attempt
{IdempotencyKey: "key-2", Payload: []byte(`{"order_id": "456"}`)}, // Another message
{IdempotencyKey: "key-1", Payload: []byte(`{"order_id": "123"}`)}, // Duplicate of the first
}
for _, msg := range messages {
log.Printf("\nReceived message with key: %s", msg.IdempotencyKey)
err := protectedHandler(ctx, msg.IdempotencyKey, msg.Payload)
if err != nil {
log.Printf("Handler returned error for key %s: %v. NACKING message.", msg.IdempotencyKey, err)
// In a real consumer, you would nack() the message here.
} else {
log.Printf("Handler finished successfully for key %s. ACKING message.", msg.IdempotencyKey)
// In a real consumer, you would ack() the message here.
}
}
}
/*
EXPECTED OUTPUT:
Received message with key: key-1
INFO: Acquired lock for key key-1, processing...
--- Executing payment logic for key: key-1 ---
--- Payment logic complete for key: key-1 ---
INFO: Successfully processed key key-1, marked as completed.
Handler finished successfully for key key-1. ACKING message.
Received message with key: key-2
INFO: Acquired lock for key key-2, processing...
--- Executing payment logic for key: key-2 ---
--- Payment logic complete for key: key-2 ---
INFO: Successfully processed key key-2, marked as completed.
Handler finished successfully for key key-2. ACKING message.
Received message with key: key-1
INFO: Idempotency key key-1 already processed, skipping.
Handler finished successfully for key key-1. ACKING message.
*/
Advanced Edge Cases and Production Hardening
The implementation above is a solid foundation, but production systems present more complex challenges.
1. The Partial Failure Catastrophe
Our biggest vulnerability is a failure after the business logic succeeds but before MarkCompleted succeeds.
// ... inside the middleware
handlerErr := next(ctx, key, payload) // This succeeds
if handlerErr != nil { ... }
// CRASH or Redis goes down HERE!
if err := store.MarkCompleted(ctx, key); err != nil { ... }
In this scenario, the defer store.Unlock() runs, the lock is released, but no completion marker is ever written. The message will be redelivered, and the operation will run again, violating idempotency.
Solution: Atomic Commit with Lua Scripting
We can solve this by making the MarkCompleted and Unlock operations atomic. Instead of two separate Redis calls, we can use a single Lua script that Redis executes atomically.
-- LUA script: mark_completed_and_unlock.lua
-- KEYS[1]: The completed key (e.g., 'idem:done:some-key')
-- KEYS[2]: The lock key (e.g., 'idem:lock:some-key')
-- ARGV[1]: The value to set for the completed key (e.g., 'true')
-- ARGV[2]: The TTL for the completed key in seconds
redis.call('SET', KEYS[1], ARGV[1], 'EX', ARGV[2])
redis.call('DEL', KEYS[2])
return 1
We would update our RedisStore to load and execute this script.
// In RedisStore
var markAndUnlockScript = redis.NewScript(`
redis.call('SET', KEYS[1], ARGV[1], 'EX', ARGV[2])
redis.call('DEL', KEYS[2])
return 1
`)
// New MarkCompletedAndUnlock method
func (s *RedisStore) MarkCompletedAndUnlock(ctx context.Context, key string) error {
completedKey := s.completedKey(key)
lockKey := s.lockKey(key)
return markAndUnlockScript.Run(ctx, s.client, []string{completedKey, lockKey}, true, s.completedTTL.Seconds()).Err()
}
// Middleware updated to use the new atomic function
// ...
if handlerErr == nil {
// This is now an atomic operation
if err := store.MarkCompletedAndUnlock(ctx, key); err != nil {
// ... handle critical failure
}
// No need for a deferred Unlock anymore when successful
}
This significantly reduces the window for failure, though it doesn't eliminate it entirely (e.g., the Go process could crash after the Lua script is sent but before the response is received). The window is now reduced to network transit time, which is a massive improvement.
2. Handling Operation Results
Sometimes, an idempotent operation needs to return a result. For example, a CreateUser event handler should return the same user_id on every duplicate call. The idempotency store can be used to cache the result of the first successful execution.
We can modify the idem:done: key to store the marshaled result instead of a simple boolean.
// store.go
type Store interface {
// ... (Lock, Unlock)
GetResult(ctx context.Context, key string) (result []byte, found bool, err error)
MarkCompletedWithResult(ctx context.Context, key string, result []byte) error
}
// middleware.go
// ...
result, found, err := store.GetResult(ctx, key)
if err != nil { /* handle error */ }
if found {
log.Printf("Returning cached result for key %s", key)
// How you return the result depends on your consumer framework.
// You might write it to a response channel or simply ack.
return nil
}
// ... after business logic succeeds
// handler now needs to return a result []byte
result, handlerErr := next(ctx, key, payload)
if handlerErr == nil {
store.MarkCompletedWithResult(ctx, key, result)
}
This complicates the Store implementation but is essential for non-mutative operations that produce a value.
3. Choosing TTLs: Lock vs. Completion
lockTTL): This is a safety mechanism. It should be set to a value slightly longer than the maximum expected processing time for your handler (e.g., P99.9 latency + a buffer). If a consumer acquires a lock and dies, this TTL ensures the lock is eventually released, preventing a permanent deadlock for that idempotency key. If set too short, a long-running but valid operation might have its lock expire, allowing another consumer to start processing, leading to a race condition.completedTTL): This determines how long you remember a completed operation. It should be longer than the maximum possible time a message can exist and be redelivered in your broker system. For Kafka, this might be related to your log retention. For RabbitMQ, it might be related to message TTLs and queue policies. A value of 24-72 hours is often a safe, pragmatic choice, balancing memory usage in Redis against the risk of forgetting a processed event.Performance and Scalability Considerations
CheckCompleted and Lock, then MarkCompleted). In high-performance systems, this overhead is non-negligible. Ensure your consumers and Redis are in the same VPC/region/availability zone to minimize latency.go-redis library handles this for you, but ensure PoolSize is tuned for your consumer's concurrency level.Conclusion: Idempotency as a First-Class Citizen
In event-driven architectures, assuming messages will be delivered exactly once is a recipe for disaster. Building idempotency into your consumers is not an optional enhancement; it's a fundamental requirement for correctness.
By encapsulating this complex, stateful logic within a generic middleware, we achieve a powerful separation of concerns. The core business logic remains pure and testable, unaware of the complexities of distributed message delivery. The idempotency layer, built on a robust and atomic backend like Redis, provides a reusable, production-hardened guarantee against duplicate processing. While the implementation requires careful handling of edge cases, particularly around atomic state transitions and failure modes, the resulting system is vastly more resilient and reliable at scale.