Production Idempotency Middleware in Go for Kafka Consumers using Redis
The Inescapable Problem: At-Least-Once Delivery
In distributed systems, particularly those built on message brokers like Kafka, RabbitMQ, or AWS SQS, the contract between the broker and the consumer is almost always at-least-once delivery. The dream of exactly-once is notoriously difficult and often impractical to achieve at the infrastructure level. This means your application logic must be prepared to receive the same message multiple times. A consumer might process a message, execute business logic, but crash before it can commit the offset to the broker. Upon restart, the broker, unaware of the successful processing, will redeliver the exact same message.
For many operations, this is catastrophic. Imagine a CreateOrder event resulting in two orders, a ProcessPayment event charging a customer twice, or a DecrementStock event depleting inventory incorrectly. The business impact is severe. The solution is not to re-architect the message broker but to build idempotency directly into the consumer application logic.
This article details a robust, state-aware middleware pattern in Go for achieving idempotency in a Kafka consumer. We'll move past simplistic implementations and build a production-ready solution using Redis that handles critical edge cases, ensuring correctness even in the face of process crashes and network partitions.
Core Strategy: The Idempotency Key and a Distributed Lock
The fundamental principle is to uniquely identify each logical operation and track its processing state. We achieve this with two components:
payment_intent_id). The key must be deterministic and unique for each discrete operation.SETNX, SET with options) and high performance.The naive approach many engineers first attempt looks something like this:
- Extract the idempotency key from the message.
SETNX (SET if Not Exists) in Redis to try and set the key.SETNX returns 1 (success), the message is new. Process it.SETNX returns 0 (key already exists), the message is a duplicate. Skip it.This is a good start, but it fails catastrophically under a common failure mode: a crash after the business logic has executed but before the idempotency record is confirmed and the Kafka offset is committed. The SETNX lock is never set, and the redelivered message will be processed again. We need a more sophisticated, state-aware approach.
Architectural Pattern: State-Aware Idempotency Middleware
To keep our business logic clean and decoupled from idempotency concerns, we'll implement this as a middleware or a decorator pattern. The consumer's primary role is to pass the message to the middleware, which then wraps the actual business logic handler.
Our state machine for each idempotency key will have three states:
* STARTED: The key has been seen, but processing has not begun or completed.
* PROCESSING: A worker is actively executing the business logic for this key. This acts as a lock to prevent concurrent processing of the same message by different consumers in the same group.
* COMPLETED: The business logic has successfully finished. Any subsequent messages with this key should be acknowledged and skipped immediately.
Here is the refined processing flow:
* GET the value for the key.
* If the state is COMPLETED, the operation is done. Acknowledge the message (commit Kafka offset) and stop.
* If the state is PROCESSING, another worker is likely busy with it. Check the lock's timestamp/TTL. If it's not expired, skip this message for now (it will be redelivered). If it has expired, assume the previous worker crashed and proceed.
* If the key does not exist or the state is STARTED (from a previous failed attempt), proceed to acquire a lock.
SET the key's state to PROCESSING in Redis, including the worker ID and a short TTL (e.g., 30 seconds). This TTL acts as a dead-man's switch if the worker crashes. * Atomically SET the key's state to COMPLETED in Redis. This time, use a much longer TTL (e.g., 24 hours) to prevent the key from being garbage collected too soon while still ensuring Redis doesn't grow indefinitely.
* Acknowledge the message (commit Kafka offset).
Do not* update the key's state. The PROCESSING lock will simply expire.
Do not* acknowledge the message. Let Kafka redeliver it for a future retry attempt (or send it to a Dead Letter Queue after several failures).
This architecture is resilient to crashes at any point in the process.
Code Example 1: A Complete Go Implementation
Let's translate this architecture into production-grade Go code. We'll use the segmentio/kafka-go library for Kafka consumption and go-redis/redis for interacting with Redis.
Project Structure:
/idempotency-kafka
|-- /idempotency
| |-- store.go
| |-- middleware.go
|-- main.go
|-- go.mod
|-- go.sum
`idempotency/store.go`: The Storage Abstraction
First, we define an interface for our state store. This allows us to swap Redis for another backend like PostgreSQL or DynamoDB in the future without changing the middleware logic.
package idempotency
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/go-redis/redis/v8"
)
// Status represents the processing state of a message.
type Status string
const (
StatusProcessing Status = "PROCESSING"
StatusCompleted Status = "COMPLETED"
)
// Record is the data structure we store in Redis.
type Record struct {
Status Status `json:"status"`
Timestamp time.Time `json:"timestamp"`
}
// Store defines the interface for an idempotency storage backend.
type Store interface {
// CheckAndSet attempts to mark a key as processing or returns the existing status.
// It returns the current status and a boolean indicating if the caller can proceed.
CheckAndSet(ctx context.Context, key string, processingTTL time.Duration) (Status, bool, error)
// SetCompleted marks a key as completed.
SetCompleted(ctx context.Context, key string, retentionTTL time.Duration) error
}
// RedisStore implements the Store interface using Redis.
type RedisStore struct {
client *redis.Client
}
func NewRedisStore(client *redis.Client) *RedisStore {
return &RedisStore{client: client}
}
// CheckAndSet is the core logic for acquiring a processing lock.
func (s *RedisStore) CheckAndSet(ctx context.Context, key string, processingTTL time.Duration) (Status, bool, error) {
val, err := s.client.Get(ctx, key).Result()
if err != nil && err != redis.Nil {
return "", false, err // Redis error
}
if err == nil {
var record Record
if jsonErr := json.Unmarshal([]byte(val), &record); jsonErr != nil {
// Data corruption, treat as a transient error and let it retry
return "", false, jsonErr
}
if record.Status == StatusCompleted {
return StatusCompleted, false, nil // Already done, don't proceed
}
if record.Status == StatusProcessing {
// Another worker is busy, or it crashed. The TTL on the key itself will handle the crash case.
return StatusProcessing, false, nil // Locked, don't proceed
}
}
// Key does not exist or data was corrupted/unexpected, try to set it.
newRecord := Record{
Status: StatusProcessing,
Timestamp: time.Now(),
}
recordBytes, _ := json.Marshal(newRecord)
// Use SET with NX flag for atomicity. This is our distributed lock.
// If another process sets it between our GET and this SET, this will fail.
ok, err := s.client.SetNX(ctx, key, recordBytes, processingTTL).Result()
if err != nil {
return "", false, err
}
if !ok {
// Lost the race, another consumer just set the key.
return StatusProcessing, false, nil
}
// We successfully acquired the lock.
return StatusProcessing, true, nil
}
// SetCompleted marks the operation as successfully finished.
func (s *RedisStore) SetCompleted(ctx context.Context, key string, retentionTTL time.Duration) error {
record := Record{
Status: StatusCompleted,
Timestamp: time.Now(),
}
recordBytes, _ := json.Marshal(record)
return s.client.Set(ctx, key, recordBytes, retentionTTL).Err()
}
Note on the CheckAndSet logic: This implementation uses a GET followed by a SETNX. While not a single atomic operation, the SETNX ensures that only one consumer can win the race to set the PROCESSING state. A more advanced implementation could use a Lua script to perform the get-check-set logic atomically on the Redis server, reducing latency and eliminating race conditions between the GET and SETNX calls.
`idempotency/middleware.go`: The Middleware Logic
This is the orchestrator. It uses the Store to wrap the business logic handler.
package idempotency
import (
"context"
"log"
"time"
"github.com/segmentio/kafka-go"
)
// HandlerFunc is the type for our actual business logic.
type HandlerFunc func(ctx context.Context, msg kafka.Message) error
// MiddlewareConfig holds configuration for the idempotency middleware.
type MiddlewareConfig struct {
Store Store
ProcessingTTL time.Duration
RetentionTTL time.Duration
KeyExtractor func(kafka.Message) string
}
// Middleware creates a new Kafka message handler that wraps the provided
// handler with an idempotency check.
func Middleware(config MiddlewareConfig, next HandlerFunc) HandlerFunc {
return func(ctx context.Context, msg kafka.Message) error {
idempotencyKey := config.KeyExtractor(msg)
if idempotencyKey == "" {
log.Println("WARN: No idempotency key found, processing message without check.")
return next(ctx, msg)
}
log.Printf("INFO: Checking idempotency for key: %s", idempotencyKey)
status, canProcess, err := config.Store.CheckAndSet(ctx, idempotencyKey, config.ProcessingTTL)
if err != nil {
log.Printf("ERROR: Idempotency store check failed for key '%s': %v. Will retry.", idempotencyKey, err)
// Return error to prevent Kafka offset commit, forcing a retry.
return err
}
if !canProcess {
if status == StatusCompleted {
log.Printf("INFO: Message with key '%s' already processed. Skipping.", idempotencyKey)
// Return nil to commit offset and acknowledge the duplicate message.
return nil
} else {
log.Printf("INFO: Message with key '%s' is currently being processed by another worker. Skipping for now.", idempotencyKey)
// Return error to prevent commit. It will be redelivered if the other worker fails.
return errors.New("concurrent processing detected")
}
}
// We have the lock, execute the business logic.
log.Printf("INFO: Acquired lock for key '%s'. Processing message.", idempotencyKey)
err = next(ctx, msg)
if err != nil {
log.Printf("ERROR: Business logic failed for key '%s': %v. Lock will expire.", idempotencyKey, err)
// Don't mark as complete. The lock will expire, allowing for a retry.
return err
}
// Business logic succeeded, mark as completed.
if err := config.Store.SetCompleted(ctx, idempotencyKey, config.RetentionTTL); err != nil {
log.Printf("CRITICAL: Business logic succeeded but failed to mark key '%s' as completed: %v", idempotencyKey, err)
// This is a critical failure. The message will be reprocessed.
// Requires monitoring and manual intervention.
return err
}
log.Printf("INFO: Successfully processed and marked key '%s' as completed.", idempotencyKey)
return nil
}
}
`main.go`: Tying It All Together
Here we set up our Kafka consumer, Redis client, and the actual business logic handler. We then wrap the handler with our middleware.
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"idempotency-kafka/idempotency"
"github.com/go-redis/redis/v8"
"github.com/segmentio/kafka-go"
)
// A simple business logic handler.
func paymentHandler(ctx context.Context, msg kafka.Message) error {
log.Printf("--- Executing business logic for payment: key=%s, value=%s ---", string(msg.Key), string(msg.Value))
// Simulate work
time.Sleep(2 * time.Second)
log.Printf("--- Finished business logic for payment: key=%s ---", string(msg.Key))
return nil
}
// keyExtractor defines how we get the idempotency key from a message.
// For this example, we'll use the Kafka message key.
func keyExtractor(msg kafka.Message) string {
return string(msg.Key)
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
// Setup Redis Client
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
if err := redisClient.Ping(ctx).Err(); err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
}
// Setup Idempotency Store
idempotencyStore := idempotency.NewRedisStore(redisClient)
// Setup Idempotency Middleware
middlewareConfig := idempotency.MiddlewareConfig{
Store: idempotencyStore,
KeyExtractor: keyExtractor,
ProcessingTTL: 30 * time.Second, // Lock expires after 30s
RetentionTTL: 24 * time.Hour, // Record kept for 24h
}
idempotentPaymentHandler := idempotency.Middleware(middlewareConfig, paymentHandler)
// Setup Kafka Consumer
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "payment-processor-group",
Topic: "payments",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // Commit offsets automatically
})
defer kafkaReader.Close()
log.Println("Consumer is running...")
// Graceful shutdown
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
go func() {
for {
select {
case <-ctx.Done():
return
default:
m, err := kafkaReader.FetchMessage(ctx)
if err != nil {
log.Printf("ERROR: could not fetch message: %v", err)
continue
}
if err := idempotentPaymentHandler(ctx, m); err != nil {
log.Printf("ERROR: handler failed for message offset %d: %v", m.Offset, err)
// Do not commit offset, message will be re-processed
} else {
if err := kafkaReader.CommitMessages(ctx, m); err != nil {
log.Printf("ERROR: failed to commit messages: %v", err)
}
}
}
}
}()
<-sigchan
log.Println("Shutdown signal received, closing consumer...")
cancel()
}
To test this, you would run a Kafka and Redis instance (e.g., via Docker), create the payments topic, and then produce a message to it multiple times with the same key. You will observe from the logs that the business logic is executed only once.
Advanced Topic: Handling Long-Running Processes
The ProcessingTTL is a double-edged sword. It must be short enough to quickly recover from a worker crash but long enough to accommodate the 99th percentile of your business logic's execution time. What if a process legitimately takes longer than the TTL?
If a job takes 60 seconds but the ProcessingTTL is 30 seconds, the lock will expire mid-process. Another consumer might pick up the same message, see the expired lock, and start processing it concurrently, violating our idempotency guarantee.
Solution: Lock Heartbeating
For long-running jobs, the worker that holds the lock must periodically extend its lifetime. This can be done with a background goroutine that wakes up every ProcessingTTL / 2 seconds and issues a SET command to Redis to update the key's TTL.
Here's a sketch of how you could modify the middleware:
// Inside the Middleware function, after acquiring the lock:
// We have the lock, execute the business logic.
log.Printf("INFO: Acquired lock for key '%s'. Processing message.", idempotencyKey)
// Create a context for the heartbeat
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
defer cancelHeartbeat() // Ensure heartbeat stops when handler finishes
// Start heartbeat goroutine
go func() {
ticker := time.NewTicker(config.ProcessingTTL / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Extend the lock TTL. We need a new method in our store.
err := config.Store.Extend(heartbeatCtx, idempotencyKey, config.ProcessingTTL)
if err != nil {
log.Printf("WARN: Failed to extend lock for key '%s': %v", idempotencyKey, err)
}
case <-heartbeatCtx.Done():
return
}
}
}()
// Execute business logic
err = next(ctx, msg)
// The defer cancelHeartbeat() will automatically stop the goroutine here.
// ... rest of the logic ...
The store.Extend method would simply perform a SET command on the existing key to update its value and reset its TTL, but only if the key still exists and is held by the current worker (if we add a worker ID to the record).
Performance and Scalability Considerations
GET/SETNX and SET). In a typical cloud environment, this adds 1-5ms of latency per message. This is usually acceptable but must be factored into your SLOs.CheckAndSet method will return an error, preventing the Kafka offset from being committed. This is often the desired behavior—it's better to halt processing than to process messages incorrectly. Ensure your Redis deployment is highly available (e.g., using Redis Sentinel or Redis Cluster).RetentionTTL is critical for managing Redis memory. A 24-hour TTL is a safe default, but if you process billions of unique messages per day, this could lead to significant memory pressure. You must size your Redis instance to hold the peak number of unique keys expected within your retention window.Conclusion
Implementing consumer-side idempotency is not an optional enhancement; it is a fundamental requirement for building reliable event-driven systems. A simple SETNX is a fragile solution that fails under common crash scenarios. By adopting a state-aware middleware pattern (PROCESSING, COMPLETED), we create a resilient system that guarantees correctness.
This Go implementation provides a robust, extensible, and production-ready foundation. It decouples the idempotency logic from the business logic, handles critical failure modes, and includes provisions for advanced scenarios like long-running jobs. While it introduces a dependency on a distributed store like Redis and adds a small amount of processing latency, the correctness and reliability it provides are non-negotiable for any system where duplicate operations have a real-world cost.