Idempotent Kafka Consumers for Event Sourcing with Redis Locking

15 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Idempotency Imperative in Event-Sourced Systems

In distributed systems, the promise of "exactly-once" processing is the holy grail—a guarantee that every message is processed once and only once. However, in practice, most messaging systems, including Apache Kafka, provide an "at-least-once" delivery guarantee by default under typical configurations. This guarantee is a pragmatic compromise. It ensures no data is lost during network partitions, consumer crashes, or group rebalances, but it comes at the cost of potential message duplication.

For an event-sourcing architecture, this is a critical problem. Consider these scenarios:

* OrderCreatedEvent: Re-processing this event could create a duplicate order, leading to fulfillment and billing nightmares.

* PaymentProcessedEvent: A duplicate event could result in double-charging a customer, causing direct financial impact and loss of trust.

* UserDeactivatedEvent: If the user is already deactivated, a re-processed event might trigger unnecessary downstream workflows or fail with an error, creating noise and potential for infinite retries.

Attempting to achieve exactly-once semantics (EOS) across heterogeneous systems (Kafka -> Your Service -> Database -> External API) is fraught with complexity and often impossible without two-phase commit protocols, which are notoriously slow and brittle. The more robust and widely adopted solution is to embrace at-least-once delivery and make the consumer idempotent. An idempotent operation is one that can be applied multiple times without changing the result beyond the initial application.

This article details a battle-tested, high-performance pattern for implementing idempotent Kafka consumers using Redis for distributed locking and state management. We'll move beyond naive approaches and build a system that is resilient to consumer crashes, race conditions, and poison pill messages.

Designing a Resilient Idempotency Key

The foundation of any idempotency system is a unique key that identifies each message processing attempt. The choice of this key is critical.

  • Naive Approach: Kafka Coordinates (topic-partition-offset)
  • A common first thought is to use the message's physical coordinates in the Kafka log. The key would be my-topic-3-12345. While simple, this is dangerously flawed. If the topic is ever deleted and recreated (a common practice in some disaster recovery or testing scenarios), the offsets reset to zero. An old message from a previous incarnation of the topic could be masked by a new one, or vice-versa.

  • Better Approach: Business-Level Identifiers
  • Using a natural key from the event payload, such as an orderId or transactionId, is a significant improvement. This ties idempotency to the business operation itself. However, it can be problematic if a single business operation can emit multiple, distinct events that must all be processed. For example, an Order might emit OrderValidated, OrderInventoryReserved, and OrderPaymentAuthorized events, all sharing the same orderId. If you use orderId as the idempotency key, only the first event processed will succeed.

  • Production-Grade Approach: Producer-Generated Event ID
  • The most robust strategy is for the event producer to generate a unique identifier for every single event it creates and place it in the message headers or payload. A UUID is a perfect candidate for this.

    json
    // Example Kafka Message Payload with an Event ID
    {
      "eventId": "a8c7b6e5-2f41-4a2b-8b1e-7c9d0f8a6b3c",
      "eventType": "PaymentProcessed",
      "timestamp": "2023-10-27T10:00:00Z",
      "payload": {
        "transactionId": "txn_12345",
        "amount": 99.99,
        "currency": "USD"
      }
    }

    This eventId is guaranteed to be unique for each message, regardless of its business content or its position in the Kafka log. This is the key we will use for our distributed lock in Redis.

    Core Implementation: Beyond Naive Redis Locking

    A simple SETNX (SET if Not eXists) in Redis seems like an obvious solution. Let's examine why it's insufficient.

    The Naive SETNX Pattern (and its fatal flaw):

    go
    // DO NOT USE THIS IN PRODUCTION
    func naiveHandleMessage(ctx context.Context, msg kafka.Message, rdb *redis.Client) {
        eventId := getEventId(msg)
        idempotencyKey := "idempotency:" + eventId
    
        // 1. Acquire lock
        wasSet, err := rdb.SetNX(ctx, idempotencyKey, "processed", 24*time.Hour).Result()
        if err != nil {
            // Handle Redis error, do not commit offset
            return
        }
    
        if !wasSet {
            // Message already processed, skip and commit offset
            commitOffset(msg)
            return
        }
    
        // 2. Process business logic
        err = processBusinessLogic(ctx, msg)
        if err != nil {
            // CRITICAL FLAW HERE!
            // We should release the lock, but what if the service crashes now?
            rdb.Del(ctx, idempotencyKey) // Attempt to release lock
            return // Do not commit offset
        }
    
        // 3. Commit Kafka offset
        commitOffset(msg)
    }

    The critical failure scenario: The consumer acquires the lock (SETNX returns true), begins processing, and then crashes before committing the Kafka offset. The lock key (idempotency:a8c7...) now exists permanently in Redis (or until its TTL expires). When Kafka re-delivers the message to another consumer, that consumer will see the lock, assume the message was successfully processed, and incorrectly skip it. The message is effectively lost.

    This demonstrates that we need to distinguish between a message that is being processed and one that is successfully completed.

    Production-Grade Pattern: Stateful Idempotency Tracking

    To solve this, we introduce a state machine for our idempotency key in Redis. Instead of a simple boolean flag, the key will store a status: PROCESSING, COMPLETED, or FAILED.

    Here is the robust algorithm:

  • On Message Receipt: Extract the eventId.
  • Check Key Status in Redis: Atomically check for the key idempotency:.
  • * If status is COMPLETED: The message has been successfully processed. Acknowledge/commit the Kafka offset and stop.

    * If status is PROCESSING: Another consumer is likely working on this message, or it crashed. We must not proceed. Wait a short period and retry, or after several checks, assume the previous worker is a zombie and flag for manual intervention or move to a Dead Letter Queue (DLQ).

    * If key does not exist: We are the first to see this message. Proceed to acquire the lock.

  • Acquire Lock: Atomically set the key's status to PROCESSING with a short TTL (e.g., 5 minutes). This must be an atomic operation (e.g., SET ... NX). If we fail to acquire the lock (because another consumer beat us to it), we go back to step 2.
  • Execute Business Logic: Perform the database updates, API calls, etc.
  • Handle Logic Outcome:
  • On Success: Atomically update the key's status to COMPLETED and set a longer TTL (e.g., 24-72 hours). Then, commit the Kafka offset. The order is critical: update Redis before* committing to Kafka.

    * On Failure: Delete the idempotency key from Redis. This allows the message to be fully re-processed upon redelivery. Do not commit the Kafka offset.

    Implementation with Go and Redis Lua Script

    To ensure atomicity for our Redis operations, a Lua script executed via EVAL is the most performant and reliable method. It guarantees that no other command can run between the steps of our script.

    Here is a complete, runnable example in Go using kafka-go and go-redis.

    The Redis Lua Scripts:

    We need two scripts: one to acquire the lock and one to mark it as complete.

    * acquire_lock.lua

    lua
    -- KEYS[1] = idempotency_key
    -- ARGV[1] = status_processing_json ({ "status": "PROCESSING", ... })
    -- ARGV[2] = lock_ttl_seconds
    
    local existing = redis.call('GET', KEYS[1])
    
    if existing then
        return existing
    end
    
    redis.call('SET', KEYS[1], ARGV[1], 'EX', ARGV[2])
    return nil

    This script checks if a key exists. If it does, it returns the existing value. If not, it sets the key to the PROCESSING state with a TTL and returns nil to signal success.

    * complete_lock.lua

    lua
    -- KEYS[1] = idempotency_key
    -- ARGV[1] = current_owner_id (to prevent a different worker from completing the lock)
    -- ARGV[2] = status_completed_json ({ "status": "COMPLETED", ... })
    -- ARGV[3] = completed_ttl_seconds
    
    local current_val_str = redis.call('GET', KEYS[1])
    if not current_val_str then
        return 'NOT_FOUND'
    end
    
    local current_val = cjson.decode(current_val_str)
    if current_val.owner ~= ARGV[1] then
        return 'WRONG_OWNER'
    end
    
    redis.call('SET', KEYS[1], ARGV[2], 'EX', ARGV[3])
    return 'OK'

    This script ensures that only the process that acquired the lock can mark it as complete by checking an owner field (e.g., a unique consumer instance ID).

    The Go Consumer Implementation:

    go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"os"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    	"github.com/google/uuid"
    	"github.com/segmentio/kafka-go"
    )
    
    const (
    	lockTTL       = 5 * time.Minute
    	completedTTL  = 24 * time.Hour
    )
    
    // IdempotencyRecord defines the structure we store in Redis.
    type IdempotencyRecord struct {
    	Status    string    `json:"status"`
    	Owner     string    `json:"owner"`
    	Timestamp time.Time `json:"timestamp"`
    }
    
    // Consumer holds our application state.
    type Consumer struct {
    	kReader   *kafka.Reader
    	rdb       *redis.Client
    	consumerID string
    
    	acquireScript *redis.Script
    	completeScript *redis.Script
    }
    
    func NewConsumer() *Consumer {
    	// In a real app, these would come from config
    	kReader := kafka.NewReader(kafka.ReaderConfig{
    		Brokers: []string{"localhost:9092"},
    		GroupID: "my-idempotent-group",
    		Topic:   "events",
    	})
    	rdb := redis.NewClient(&redis.Options{
    		Addr: "localhost:6379",
    	})
    
    	// Load Lua scripts
    	acquireLua, _ := os.ReadFile("acquire_lock.lua")
    	completeLua, _ := os.ReadFile("complete_lock.lua")
    
    	return &Consumer{
    		kReader:   kReader,
    		rdb:       rdb,
    		consumerID: uuid.New().String(), // Unique ID for this consumer instance
    		acquireScript: redis.NewScript(string(acquireLua)),
    		completeScript: redis.NewScript(string(completeLua)),
    	}
    }
    
    func (c *Consumer) Run(ctx context.Context) {
    	for {
    		msg, err := c.kReader.FetchMessage(ctx)
    		if err != nil {
    			fmt.Printf("could not fetch message: %v\n", err)
    			break
    		}
    
    		if err := c.handleMessage(ctx, msg); err != nil {
    			fmt.Printf("error handling message %s: %v. Not committing offset.\n", string(msg.Key), err)
    			// Do not commit, message will be redelivered.
    		} else {
    			if err := c.kReader.CommitMessages(ctx, msg); err != nil {
    				fmt.Printf("failed to commit message: %v\n", err)
    			}
    		}
    	}
    }
    
    func getEventIdFromMessage(msg kafka.Message) string {
    	for _, h := range msg.Headers {
    		if h.Key == "eventId" {
    			return string(h.Value)
    		}
    	}
    	return ""
    }
    
    func (c *Consumer) handleMessage(ctx context.Context, msg kafka.Message) error {
    	eventId := getEventIdFromMessage(msg)
    	if eventId == "" {
    		return fmt.Errorf("message missing eventId header")
    	}
    	idempotencyKey := fmt.Sprintf("idempotency:%s", eventId)
    
    	// 1. & 2. Check status and try to acquire lock
    	processingRecord := IdempotencyRecord{
    		Status:    "PROCESSING",
    		Owner:     c.consumerID,
    		Timestamp: time.Now(),
    	}
    	processingJSON, _ := json.Marshal(processingRecord)
    
    	// Using Lua script for atomic check-and-set
    	res, err := c.acquireScript.Run(ctx, c.rdb, []string{idempotencyKey}, string(processingJSON), lockTTL.Seconds()).Result()
    	if err != nil {
    		return fmt.Errorf("redis acquire lock script failed: %w", err)
    	}
    
    	if res != nil {
    		// Key already existed, inspect its state
    		existingVal, ok := res.(string)
    		if !ok {
    			return fmt.Errorf("unexpected type from redis: %T", res)
    		}
    		var existingRecord IdempotencyRecord
    		if err := json.Unmarshal([]byte(existingVal), &existingRecord); err != nil {
    			return fmt.Errorf("failed to unmarshal existing record: %w", err)
    		}
    
    		if existingRecord.Status == "COMPLETED" {
    			fmt.Printf("Event %s already completed. Skipping.\n", eventId)
    			return nil // Success, commit offset
    		}
    
    		if existingRecord.Status == "PROCESSING" {
    			// Another worker is processing or has died.
    			// In a real app, you might have more complex logic here (e.g., check timestamp)
    			return fmt.Errorf("event %s is already being processed by %s. Backing off", eventId, existingRecord.Owner)
    		}
    	}
    
    	fmt.Printf("Acquired lock for event %s\n", eventId)
    
    	// 3. Execute business logic
    	err = c.processBusinessLogic(ctx, msg)
    	if err != nil {
    		// On failure, release the lock so it can be retried.
    		fmt.Printf("Business logic failed for event %s. Releasing lock. Error: %v\n", eventId, err)
    		c.rdb.Del(ctx, idempotencyKey)
    		return fmt.Errorf("business logic failed: %w", err)
    	}
    
    	// 4. On success, mark as completed
    	completedRecord := IdempotencyRecord{
    		Status:    "COMPLETED",
    		Owner:     c.consumerID,
    		Timestamp: time.Now(),
    	}
    	completedJSON, _ := json.Marshal(completedRecord)
    
    	// Using Lua script for atomic conditional update
    	_, err = c.completeScript.Run(ctx, c.rdb, []string{idempotencyKey}, c.consumerID, string(completedJSON), completedTTL.Seconds()).Result()
    	if err != nil {
    		// This is a tricky state. Logic is done, but we can't mark it as complete.
    		// This could lead to reprocessing. Logging is critical here.
    		return fmt.Errorf("CRITICAL: failed to mark as completed after processing: %w", err)
    	}
    
    	fmt.Printf("Successfully processed and marked event %s as completed.\n", eventId)
    	return nil
    }
    
    func (c *Consumer) processBusinessLogic(ctx context.Context, msg kafka.Message) error {
    	fmt.Printf("  -> Processing business logic for key %s...\n", string(msg.Key))
    	// Simulate work
    	time.Sleep(500 * time.Millisecond)
    	// In a real scenario, this would be a database transaction.
    	// For example:
    	// tx, err := db.BeginTx(ctx, nil)
    	// ... write to multiple tables ...
    	// tx.Commit()
    	fmt.Printf("  -> Business logic complete for key %s.\n", string(msg.Key))
    	return nil
    }
    
    func main() {
    	consumer := NewConsumer()
    	fmt.Printf("Starting consumer instance %s\n", consumer.consumerID)
    	consumer.Run(context.Background())
    }

    Advanced Edge Cases and Performance Considerations

    A production system must handle more than just the happy path.

    1. Poison Pill Messages

    What if a message is malformed or triggers a bug that causes processBusinessLogic to fail every time? Our current logic will create an infinite loop: acquire lock, fail, release lock, redeliver, repeat. This can overwhelm your system.

    Solution: Implement a retry counter within the Redis record.

    Modify the IdempotencyRecord:

    go
    type IdempotencyRecord struct {
    	Status     string    `json:"status"`
    	Owner      string    `json:"owner"`
    	Timestamp  time.Time `json:"timestamp"`
    	RetryCount int       `json:"retryCount"`
    }

    When acquiring the lock, if the key already exists with status PROCESSING, you can now check its retryCount. When business logic fails, instead of just deleting the key, you perform an atomic GET and SET (or another Lua script) to increment the retry count. If retryCount exceeds a threshold (e.g., 5), you change the status to FAILED, move the message to a DLQ, and commit the offset to stop the loop.

    2. Redis as a Performance Bottleneck

    In a high-throughput topic (e.g., >10,000 messages/sec), every message triggers multiple Redis commands. This can saturate a single Redis instance.

    Benchmarking: A single r6g.large EC2 instance running Redis can handle roughly 100k-150k simple ops/sec. Our pattern uses at least 2 complex (Lua) operations per message. This gives a theoretical ceiling of ~50k messages/sec per Redis node*, not accounting for network latency or command complexity. This is a real-world limit to consider.

    * Scaling: Use Redis Cluster to shard the idempotency keys across multiple nodes. The go-redis client supports this out of the box. Since each key is independent, this pattern scales horizontally very well.

    * Pipelining: While our per-message logic is transactional, you can use pipelining at a higher level if you are batch-processing messages, though this complicates the idempotency logic significantly.

    3. Lock Expiration and Zombie Processes

    We set a TTL on the PROCESSING lock (lockTTL). What if the business logic legitimately takes longer than this TTL? The lock will expire, and another consumer may pick up the same message, leading to concurrent processing.

    * Solution 1 (Simple): Tune the TTL. The easiest fix is to set a generous TTL that is well above your 99th percentile processing time (e.g., if most messages take 1 second, set the TTL to 5 minutes). This is a trade-off: a long TTL means a crashed consumer will hold a lock for longer, increasing message latency.

    * Solution 2 (Complex): Heartbeating. The consumer processing the message can run a background goroutine that periodically updates the TTL of its PROCESSING lock (e.g., EXPIRE key 300). This adds significant complexity to your consumer logic (managing goroutines, handling context cancellation) but creates the most robust system for long-running jobs.

    Alternative Approaches and Their Trade-offs

    1. Database-Level Idempotency

    You can use your primary transactional database (e.g., PostgreSQL) to store idempotency records.

    sql
    CREATE TABLE processed_events (
        event_id UUID PRIMARY KEY,
        inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );

    The consumer logic becomes:

    go
    func handleWithDatabase(tx *sql.Tx, eventId string) error {
        // This will fail with a primary key constraint violation if the eventId already exists.
        _, err := tx.ExecContext(ctx, "INSERT INTO processed_events (event_id) VALUES ($1)", eventId)
        if err != nil {
            return err // Already processed or other DB error
        }
    
        // ... execute business logic within the same transaction ...
    
        return nil // Commit happens outside this function
    }

    * Pros: Perfect atomicity. The check for the event and the business logic commit happen within a single database transaction. It's simpler to reason about.

    * Cons:

    * Performance: Puts significant write load on your primary database, which is often the most expensive and hardest-to-scale part of your infrastructure. Contention on the processed_events table can become a bottleneck.

    * Separation of Concerns: Couples your message processing infrastructure logic with your business domain data.

    * Not suitable for non-DB logic: If your "business logic" involves calling a third-party API, you cannot roll back that API call if the INSERT fails, breaking the atomicity guarantee.

    2. Kafka Streams Exactly-Once Semantics (EOS)

    Kafka Streams offers EOS, but it's crucial to understand its scope. EOS in Kafka works by using a transactional producer that coordinates with the consumer group coordinator. When a stream application reads from a source topic, processes the data, and writes to a sink topic, the entire operation (read-process-write) is committed as a single atomic transaction.

    * When to use it: It is the perfect solution for Kafka-to-Kafka transformations (e.g., filtering, aggregation, enrichment where the result is another Kafka topic).

    * Where it falls short: The moment you need to interact with an external system—like writing to a PostgreSQL database, calling a REST API, or sending an email—you break out of the Kafka transactional boundary. The write to the external system and the commit of the consumer offset are not atomic. You are back to an at-least-once scenario, and the Redis-based idempotency pattern described here becomes necessary once again.

    Conclusion

    For event-driven services that must interact with external systems, consumer-side idempotency is not an optional feature; it is a fundamental requirement for correctness. While naive approaches are simple to implement, they contain subtle but critical flaws that can lead to data loss or corruption in production.

    The stateful idempotency pattern using Redis provides a robust, scalable, and performant solution. By treating message processing as a state machine (ACQUIRING, PROCESSING, COMPLETED) and leveraging atomic Redis operations via Lua scripting, we can build consumers that are resilient to crashes, retries, and concurrent execution. While it introduces an additional dependency (Redis) and requires careful handling of edge cases like poison pills and lock expiration, this pattern offers a powerful balance of safety and performance, making it a cornerstone of modern, high-throughput event-sourcing architectures.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles