Idempotency Middleware for Asynchronous Event Consumers in Go

14 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitability of Duplicates in Distributed Systems

In any non-trivial event-driven architecture, the contract of message delivery is almost always at-least-once. Guarantees of exactly-once delivery are notoriously difficult, often impossible, to achieve across heterogeneous systems without significant performance trade-offs or specialized infrastructure. This reality stems from a simple truth: the consumer must acknowledge a message after processing it. If a consumer processes a message but crashes before it can send the ack, the message broker will assume it was never processed and redeliver it to another consumer instance. This leads to duplicate processing.

Consider a payment service that consumes an OrderCreated event:

go
// Simplified business logic
func ProcessPayment(ctx context.Context, order Order) error {
    // 1. Charge the customer's credit card via a third-party API
    err := paymentGateway.Charge(order.CustomerID, order.Amount)
    if err != nil {
        return err // The ack will not be sent, message will be redelivered
    }

    // 2. Update the order status in our database
    err = db.UpdateOrderStatus(order.ID, "PAID")
    if err != nil {
        return err // CRASH! or DB timeout. Ack not sent.
    }

    // 3. Acknowledge the message to the broker (e.g., Kafka commit offset)
    // This step is never reached if the service crashes after the charge.
    return nil
}

If the service crashes between the successful charge and the database update, the message is redelivered. The result? The customer is charged twice. This is not a theoretical edge case; it's a guaranteed failure mode in any system operating at scale. The only robust solution is to make the consumer's operation idempotent: processing the same message multiple times must have the same effect as processing it once.

This article details the design and implementation of a generic idempotency middleware in Go, using Redis as a high-performance backend for tracking and locking. We will build a solution that is decoupled from business logic and resilient to production complexities.

The Idempotency Key Pattern

The foundation of our solution is the idempotency key. This is a unique identifier for a specific operation instance. The producer of the event is responsible for generating this key and including it in the message payload or headers.

A good idempotency key must be unique for each distinct operation but identical for retries of the same operation. A UUIDv4 is a common choice.

json
// Example event payload from a Kafka topic
{
  "event_id": "evt_a1b2c3d4",
  "event_type": "order.created",
  "idempotency_key": "a7b1c3d8-e1f2-4a5b-8c9d-0e1f2a3b4c5d",
  "payload": {
    "order_id": "ord_12345",
    "customer_id": "cust_67890",
    "amount": 9999,
    "currency": "USD"
  }
}

Our middleware will use this key to track the processing state of an event. The state machine for an idempotency key is simple but critical:

  • NOT_FOUND: The operation has never been seen.
  • IN_PROGRESS: The operation is currently being processed by a consumer.
  • COMPLETED: The operation finished successfully.
  • Architecting the Middleware in Go

    We will structure our solution as an HTTP-style middleware, a common pattern in Go. This allows us to wrap any Handler function with our idempotency logic without polluting the core business code.

    Our IdempotencyMiddleware will orchestrate the entire process:

    • Extract the idempotency key from the incoming event.
    • Atomically check and set the key's status in Redis.
  • If the key is already COMPLETED, we short-circuit and acknowledge the message immediately.
  • If the key is IN_PROGRESS, another consumer is working on it. We'll treat this as a transient error and nack the message for later redelivery.
  • If the key is NOT_FOUND, we mark it as IN_PROGRESS with a timeout (a lock), execute the business logic, and finally update the key's status to COMPLETED upon success.
  • Here's the Go interface we'll build around:

    go
    package idempotency
    
    import "context"
    
    // Handler defines the function signature for the actual business logic.
    type Handler func(ctx context.Context, key string, payload []byte) error
    
    // Middleware is a function that wraps a Handler to add idempotency.
    type Middleware func(next Handler) Handler
    
    // Store defines the interface for our persistence layer (e.g., Redis).
    type Store interface {
    	// Lock attempts to acquire a lock for the given key.
    	// It should return ErrLockExists if the key is already locked.
    	Lock(ctx context.Context, key string) error
    
    	// Unlock releases the lock for a given key.
    	Unlock(ctx context.Context, key string) error
    
    	// CheckCompleted checks if a key has already been successfully processed.
    	CheckCompleted(ctx context.Context, key string) (bool, error)
    
    	// MarkCompleted marks a key as successfully processed.
    	MarkCompleted(ctx context.Context, key string) error
    }

    Implementation Deep Dive: A Redis-backed Store

    Redis is an excellent choice for an idempotency store due to its high performance, atomic operations, and support for key expiry (TTL).

    Let's implement the Store interface using the go-redis library. We'll use two distinct key prefixes: one for the lock (idem:lock:) and one for the completion marker (idem:done:).

    go
    package idempotency
    
    import (
    	"context"
    	"errors"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    )
    
    var (
    	ErrLockExists = errors.New("lock already exists")
    )
    
    // RedisStore implements the Store interface using Redis.
    type RedisStore struct {
    	client       *redis.Client
    	lockTTL      time.Duration // How long the IN_PROGRESS lock should last
    	completedTTL time.Duration // How long the COMPLETED marker should last
    }
    
    // NewRedisStore creates a new RedisStore instance.
    func NewRedisStore(client *redis.Client, lockTTL, completedTTL time.Duration) *RedisStore {
    	return &RedisStore{
    		client:       client,
    		lockTTL:      lockTTL,
    		completedTTL: completedTTL,
    	}
    }
    
    func (s *RedisStore) lockKey(key string) string {
    	return "idem:lock:" + key
    }
    
    func (s *RedisStore) completedKey(key string) string {
    	return "idem:done:" + key
    }
    
    // Lock uses `SET NX` for an atomic lock acquisition.
    func (s *RedisStore) Lock(ctx context.Context, key string) error {
    	// The 'true' value is arbitrary, we only care about the key's existence.
    	ok, err := s.client.SetNX(ctx, s.lockKey(key), true, s.lockTTL).Result()
    	if err != nil {
    		return err
    	}
    	if !ok {
    		return ErrLockExists
    	}
    	return nil
    }
    
    // Unlock simply deletes the lock key.
    func (s *RedisStore) Unlock(ctx context.Context, key string) error {
    	return s.client.Del(ctx, s.lockKey(key)).Err()
    }
    
    // CheckCompleted checks for the existence of the completion marker.
    func (s *RedisStore) CheckCompleted(ctx context.Context, key string) (bool, error) {
    	val, err := s.client.Exists(ctx, s.completedKey(key)).Result()
    	if err != nil {
    		return false, err
    	}
    	return val > 0, nil
    }
    
    // MarkCompleted sets the completion marker with a TTL.
    func (s *RedisStore) MarkCompleted(ctx context.Context, key string) error {
    	return s.client.Set(ctx, s.completedKey(key), true, s.completedTTL).Err()
    }

    The Middleware Implementation

    Now we can wire this RedisStore into our middleware factory. This factory will return a Middleware function that can be applied to any handler.

    go
    package idempotency
    
    import (
    	"context"
    	"log"
    )
    
    // NewIdempotencyMiddleware creates the middleware function.
    func NewIdempotencyMiddleware(store Store) Middleware {
    	return func(next Handler) Handler {
    		return func(ctx context.Context, key string, payload []byte) error {
    			// 1. Check if already completed
    			completed, err := store.CheckCompleted(ctx, key)
    			if err != nil {
    				log.Printf("ERROR: Failed to check completion for key %s: %v", key, err)
    				// Treat this as a transient failure, nack the message to retry later.
    				return err 
    			}
    			if completed {
    				log.Printf("INFO: Idempotency key %s already processed, skipping.", key)
    				// Acknowledge the message, as it's a successful duplicate.
    				return nil
    			}
    
    			// 2. Attempt to acquire a lock
    			if err := store.Lock(ctx, key); err != nil {
    				if err == ErrLockExists {
    					log.Printf("WARN: Idempotency key %s is already being processed, nacking.", key)
    					// Nack the message to retry after the lock TTL expires.
    					return err 
    				}
    				log.Printf("ERROR: Failed to acquire lock for key %s: %v", key, err)
    				return err
    			}
    			// Ensure the lock is released if something goes wrong before completion.
    			defer store.Unlock(ctx, key)
    
    			// 3. Execute the business logic
    			log.Printf("INFO: Acquired lock for key %s, processing...", key)
    			handlerErr := next(ctx, key, payload)
    
    			// 4. Handle the result
    			if handlerErr != nil {
    				log.Printf("ERROR: Business logic failed for key %s: %v", key, handlerErr)
    				// The lock will be released by the defer. We do NOT mark as completed.
    				// The message will be redelivered for a clean retry.
    				return handlerErr
    			}
    
    			// 5. Mark as completed
    			if err := store.MarkCompleted(ctx, key); err != nil {
    				log.Printf("CRITICAL: Business logic succeeded for key %s but failed to mark as completed: %v", key, err)
    				// This is a critical failure state. The lock will be released,
    				// and the message will be reprocessed, but we've lost idempotency.
    				// We'll address this in the 'Advanced Edge Cases' section.
    				return err
    			}
    
    			log.Printf("INFO: Successfully processed key %s, marked as completed.", key)
    			// The lock can be released now that the completed flag is set.
    			// The defer will handle this.
    			return nil
    		}
    	}
    }

    Putting It All Together: A Consumer Example

    Here's how a Kafka consumer would use this middleware.

    go
    // main.go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"log"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    	"your-project/idempotency"
    )
    
    // Represents the message from Kafka
    type KafkaMessage struct {
    	IdempotencyKey string          `json:"idempotency_key"`
    	Payload        json.RawMessage `json:"payload"`
    }
    
    // The actual business logic
    func paymentHandler(ctx context.Context, key string, payload []byte) error {
    	fmt.Printf("--- Executing payment logic for key: %s ---\n", key)
    	// Simulate processing time
    	time.Sleep(200 * time.Millisecond)
    	// In a real app, this would charge a card, update a DB, etc.
    	fmt.Printf("--- Payment logic complete for key: %s ---\n", key)
    	return nil
    }
    
    func main() {
    	// --- Setup ---
    	redisClient := redis.NewClient(&redis.Options{
    		Addr: "localhost:6379",
    	})
    
    	// Lock should be long enough for the operation to complete.
    	// Completed marker should be long enough to outlast any message redelivery delay.
    	store := idempotency.NewRedisStore(redisClient, 1*time.Minute, 24*time.Hour)
    	idempotencyMiddleware := idempotency.NewIdempotencyMiddleware(store)
    
    	// Wrap the business logic with the middleware
    	protectedHandler := idempotencyMiddleware(paymentHandler)
    
    	// --- Simulate receiving messages from Kafka ---
    	ctx := context.Background()
    	messages := []KafkaMessage{
    		{IdempotencyKey: "key-1", Payload: []byte(`{"order_id": "123"}`)}, // First attempt
    		{IdempotencyKey: "key-2", Payload: []byte(`{"order_id": "456"}`)}, // Another message
    		{IdempotencyKey: "key-1", Payload: []byte(`{"order_id": "123"}`)}, // Duplicate of the first
    	}
    
    	for _, msg := range messages {
    		log.Printf("\nReceived message with key: %s", msg.IdempotencyKey)
    		err := protectedHandler(ctx, msg.IdempotencyKey, msg.Payload)
    		if err != nil {
    			log.Printf("Handler returned error for key %s: %v. NACKING message.", msg.IdempotencyKey, err)
    			// In a real consumer, you would nack() the message here.
    		} else {
    			log.Printf("Handler finished successfully for key %s. ACKING message.", msg.IdempotencyKey)
    			// In a real consumer, you would ack() the message here.
    		}
    	}
    }
    
    /*
    EXPECTED OUTPUT:
    
    Received message with key: key-1
    INFO: Acquired lock for key key-1, processing...
    --- Executing payment logic for key: key-1 ---
    --- Payment logic complete for key: key-1 ---
    INFO: Successfully processed key key-1, marked as completed.
    Handler finished successfully for key key-1. ACKING message.
    
    Received message with key: key-2
    INFO: Acquired lock for key key-2, processing...
    --- Executing payment logic for key: key-2 ---
    --- Payment logic complete for key: key-2 ---
    INFO: Successfully processed key key-2, marked as completed.
    Handler finished successfully for key key-2. ACKING message.
    
    Received message with key: key-1
    INFO: Idempotency key key-1 already processed, skipping.
    Handler finished successfully for key key-1. ACKING message.
    */

    Advanced Edge Cases and Production Hardening

    The implementation above is a solid foundation, but production systems present more complex challenges.

    1. The Partial Failure Catastrophe

    Our biggest vulnerability is a failure after the business logic succeeds but before MarkCompleted succeeds.

    go
    // ... inside the middleware
    handlerErr := next(ctx, key, payload) // This succeeds
    if handlerErr != nil { ... }
    
    // CRASH or Redis goes down HERE!
    if err := store.MarkCompleted(ctx, key); err != nil { ... }

    In this scenario, the defer store.Unlock() runs, the lock is released, but no completion marker is ever written. The message will be redelivered, and the operation will run again, violating idempotency.

    Solution: Atomic Commit with Lua Scripting

    We can solve this by making the MarkCompleted and Unlock operations atomic. Instead of two separate Redis calls, we can use a single Lua script that Redis executes atomically.

    lua
    -- LUA script: mark_completed_and_unlock.lua
    -- KEYS[1]: The completed key (e.g., 'idem:done:some-key')
    -- KEYS[2]: The lock key (e.g., 'idem:lock:some-key')
    -- ARGV[1]: The value to set for the completed key (e.g., 'true')
    -- ARGV[2]: The TTL for the completed key in seconds
    
    redis.call('SET', KEYS[1], ARGV[1], 'EX', ARGV[2])
    redis.call('DEL', KEYS[2])
    return 1

    We would update our RedisStore to load and execute this script.

    go
    // In RedisStore
    var markAndUnlockScript = redis.NewScript(`
        redis.call('SET', KEYS[1], ARGV[1], 'EX', ARGV[2])
        redis.call('DEL', KEYS[2])
        return 1
    `)
    
    // New MarkCompletedAndUnlock method
    func (s *RedisStore) MarkCompletedAndUnlock(ctx context.Context, key string) error {
    	completedKey := s.completedKey(key)
    	lockKey := s.lockKey(key)
    	
    	return markAndUnlockScript.Run(ctx, s.client, []string{completedKey, lockKey}, true, s.completedTTL.Seconds()).Err()
    }
    
    // Middleware updated to use the new atomic function
    // ...
    if handlerErr == nil {
        // This is now an atomic operation
        if err := store.MarkCompletedAndUnlock(ctx, key); err != nil {
            // ... handle critical failure
        }
        // No need for a deferred Unlock anymore when successful
    }

    This significantly reduces the window for failure, though it doesn't eliminate it entirely (e.g., the Go process could crash after the Lua script is sent but before the response is received). The window is now reduced to network transit time, which is a massive improvement.

    2. Handling Operation Results

    Sometimes, an idempotent operation needs to return a result. For example, a CreateUser event handler should return the same user_id on every duplicate call. The idempotency store can be used to cache the result of the first successful execution.

    We can modify the idem:done: key to store the marshaled result instead of a simple boolean.

    go
    // store.go
    type Store interface {
        // ... (Lock, Unlock)
        GetResult(ctx context.Context, key string) (result []byte, found bool, err error)
        MarkCompletedWithResult(ctx context.Context, key string, result []byte) error
    }
    
    // middleware.go
    // ...
    result, found, err := store.GetResult(ctx, key)
    if err != nil { /* handle error */ }
    if found {
        log.Printf("Returning cached result for key %s", key)
        // How you return the result depends on your consumer framework.
        // You might write it to a response channel or simply ack.
        return nil 
    }
    
    // ... after business logic succeeds
    // handler now needs to return a result []byte
    result, handlerErr := next(ctx, key, payload)
    if handlerErr == nil {
        store.MarkCompletedWithResult(ctx, key, result)
    }

    This complicates the Store implementation but is essential for non-mutative operations that produce a value.

    3. Choosing TTLs: Lock vs. Completion

  • Lock TTL (lockTTL): This is a safety mechanism. It should be set to a value slightly longer than the maximum expected processing time for your handler (e.g., P99.9 latency + a buffer). If a consumer acquires a lock and dies, this TTL ensures the lock is eventually released, preventing a permanent deadlock for that idempotency key. If set too short, a long-running but valid operation might have its lock expire, allowing another consumer to start processing, leading to a race condition.
  • Completion TTL (completedTTL): This determines how long you remember a completed operation. It should be longer than the maximum possible time a message can exist and be redelivered in your broker system. For Kafka, this might be related to your log retention. For RabbitMQ, it might be related to message TTLs and queue policies. A value of 24-72 hours is often a safe, pragmatic choice, balancing memory usage in Redis against the risk of forgetting a processed event.
  • Performance and Scalability Considerations

  • Network Latency: Every message processed now incurs at least two network round trips to Redis (CheckCompleted and Lock, then MarkCompleted). In high-performance systems, this overhead is non-negligible. Ensure your consumers and Redis are in the same VPC/region/availability zone to minimize latency.
  • Redis as a Bottleneck: While Redis is incredibly fast, it can become a bottleneck if you're processing hundreds of thousands of events per second. Consider using a sharded Redis cluster (like Redis Cluster or ElastiCache) to distribute the load across multiple nodes. The idempotency keys are typically random and will distribute well across shards.
  • Connection Pooling: Ensure your Go application uses a properly configured Redis connection pool. Establishing a new TCP connection for every check is prohibitively expensive. The go-redis library handles this for you, but ensure PoolSize is tuned for your consumer's concurrency level.
  • Conclusion: Idempotency as a First-Class Citizen

    In event-driven architectures, assuming messages will be delivered exactly once is a recipe for disaster. Building idempotency into your consumers is not an optional enhancement; it's a fundamental requirement for correctness.

    By encapsulating this complex, stateful logic within a generic middleware, we achieve a powerful separation of concerns. The core business logic remains pure and testable, unaware of the complexities of distributed message delivery. The idempotency layer, built on a robust and atomic backend like Redis, provides a reusable, production-hardened guarantee against duplicate processing. While the implementation requires careful handling of edge cases, particularly around atomic state transitions and failure modes, the resulting system is vastly more resilient and reliable at scale.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles