Idempotent Kafka Consumers for Event Sourcing with Redis Locking
The Idempotency Imperative in Event-Sourced Systems
In distributed systems, the promise of "exactly-once" processing is the holy grail—a guarantee that every message is processed once and only once. However, in practice, most messaging systems, including Apache Kafka, provide an "at-least-once" delivery guarantee by default under typical configurations. This guarantee is a pragmatic compromise. It ensures no data is lost during network partitions, consumer crashes, or group rebalances, but it comes at the cost of potential message duplication.
For an event-sourcing architecture, this is a critical problem. Consider these scenarios:
* OrderCreatedEvent: Re-processing this event could create a duplicate order, leading to fulfillment and billing nightmares.
* PaymentProcessedEvent: A duplicate event could result in double-charging a customer, causing direct financial impact and loss of trust.
* UserDeactivatedEvent: If the user is already deactivated, a re-processed event might trigger unnecessary downstream workflows or fail with an error, creating noise and potential for infinite retries.
Attempting to achieve exactly-once semantics (EOS) across heterogeneous systems (Kafka -> Your Service -> Database -> External API) is fraught with complexity and often impossible without two-phase commit protocols, which are notoriously slow and brittle. The more robust and widely adopted solution is to embrace at-least-once delivery and make the consumer idempotent. An idempotent operation is one that can be applied multiple times without changing the result beyond the initial application.
This article details a battle-tested, high-performance pattern for implementing idempotent Kafka consumers using Redis for distributed locking and state management. We'll move beyond naive approaches and build a system that is resilient to consumer crashes, race conditions, and poison pill messages.
Designing a Resilient Idempotency Key
The foundation of any idempotency system is a unique key that identifies each message processing attempt. The choice of this key is critical.
topic-partition-offset) A common first thought is to use the message's physical coordinates in the Kafka log. The key would be my-topic-3-12345. While simple, this is dangerously flawed. If the topic is ever deleted and recreated (a common practice in some disaster recovery or testing scenarios), the offsets reset to zero. An old message from a previous incarnation of the topic could be masked by a new one, or vice-versa.
Using a natural key from the event payload, such as an orderId or transactionId, is a significant improvement. This ties idempotency to the business operation itself. However, it can be problematic if a single business operation can emit multiple, distinct events that must all be processed. For example, an Order might emit OrderValidated, OrderInventoryReserved, and OrderPaymentAuthorized events, all sharing the same orderId. If you use orderId as the idempotency key, only the first event processed will succeed.
The most robust strategy is for the event producer to generate a unique identifier for every single event it creates and place it in the message headers or payload. A UUID is a perfect candidate for this.
// Example Kafka Message Payload with an Event ID
{
"eventId": "a8c7b6e5-2f41-4a2b-8b1e-7c9d0f8a6b3c",
"eventType": "PaymentProcessed",
"timestamp": "2023-10-27T10:00:00Z",
"payload": {
"transactionId": "txn_12345",
"amount": 99.99,
"currency": "USD"
}
}
This eventId is guaranteed to be unique for each message, regardless of its business content or its position in the Kafka log. This is the key we will use for our distributed lock in Redis.
Core Implementation: Beyond Naive Redis Locking
A simple SETNX (SET if Not eXists) in Redis seems like an obvious solution. Let's examine why it's insufficient.
The Naive SETNX Pattern (and its fatal flaw):
// DO NOT USE THIS IN PRODUCTION
func naiveHandleMessage(ctx context.Context, msg kafka.Message, rdb *redis.Client) {
eventId := getEventId(msg)
idempotencyKey := "idempotency:" + eventId
// 1. Acquire lock
wasSet, err := rdb.SetNX(ctx, idempotencyKey, "processed", 24*time.Hour).Result()
if err != nil {
// Handle Redis error, do not commit offset
return
}
if !wasSet {
// Message already processed, skip and commit offset
commitOffset(msg)
return
}
// 2. Process business logic
err = processBusinessLogic(ctx, msg)
if err != nil {
// CRITICAL FLAW HERE!
// We should release the lock, but what if the service crashes now?
rdb.Del(ctx, idempotencyKey) // Attempt to release lock
return // Do not commit offset
}
// 3. Commit Kafka offset
commitOffset(msg)
}
The critical failure scenario: The consumer acquires the lock (SETNX returns true), begins processing, and then crashes before committing the Kafka offset. The lock key (idempotency:a8c7...) now exists permanently in Redis (or until its TTL expires). When Kafka re-delivers the message to another consumer, that consumer will see the lock, assume the message was successfully processed, and incorrectly skip it. The message is effectively lost.
This demonstrates that we need to distinguish between a message that is being processed and one that is successfully completed.
Production-Grade Pattern: Stateful Idempotency Tracking
To solve this, we introduce a state machine for our idempotency key in Redis. Instead of a simple boolean flag, the key will store a status: PROCESSING, COMPLETED, or FAILED.
Here is the robust algorithm:
eventId.idempotency:. * If status is COMPLETED: The message has been successfully processed. Acknowledge/commit the Kafka offset and stop.
* If status is PROCESSING: Another consumer is likely working on this message, or it crashed. We must not proceed. Wait a short period and retry, or after several checks, assume the previous worker is a zombie and flag for manual intervention or move to a Dead Letter Queue (DLQ).
* If key does not exist: We are the first to see this message. Proceed to acquire the lock.
PROCESSING with a short TTL (e.g., 5 minutes). This must be an atomic operation (e.g., SET ... NX). If we fail to acquire the lock (because another consumer beat us to it), we go back to step 2. On Success: Atomically update the key's status to COMPLETED and set a longer TTL (e.g., 24-72 hours). Then, commit the Kafka offset. The order is critical: update Redis before* committing to Kafka.
* On Failure: Delete the idempotency key from Redis. This allows the message to be fully re-processed upon redelivery. Do not commit the Kafka offset.
Implementation with Go and Redis Lua Script
To ensure atomicity for our Redis operations, a Lua script executed via EVAL is the most performant and reliable method. It guarantees that no other command can run between the steps of our script.
Here is a complete, runnable example in Go using kafka-go and go-redis.
The Redis Lua Scripts:
We need two scripts: one to acquire the lock and one to mark it as complete.
* acquire_lock.lua
-- KEYS[1] = idempotency_key
-- ARGV[1] = status_processing_json ({ "status": "PROCESSING", ... })
-- ARGV[2] = lock_ttl_seconds
local existing = redis.call('GET', KEYS[1])
if existing then
return existing
end
redis.call('SET', KEYS[1], ARGV[1], 'EX', ARGV[2])
return nil
This script checks if a key exists. If it does, it returns the existing value. If not, it sets the key to the PROCESSING state with a TTL and returns nil to signal success.
* complete_lock.lua
-- KEYS[1] = idempotency_key
-- ARGV[1] = current_owner_id (to prevent a different worker from completing the lock)
-- ARGV[2] = status_completed_json ({ "status": "COMPLETED", ... })
-- ARGV[3] = completed_ttl_seconds
local current_val_str = redis.call('GET', KEYS[1])
if not current_val_str then
return 'NOT_FOUND'
end
local current_val = cjson.decode(current_val_str)
if current_val.owner ~= ARGV[1] then
return 'WRONG_OWNER'
end
redis.call('SET', KEYS[1], ARGV[2], 'EX', ARGV[3])
return 'OK'
This script ensures that only the process that acquired the lock can mark it as complete by checking an owner field (e.g., a unique consumer instance ID).
The Go Consumer Implementation:
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
)
const (
lockTTL = 5 * time.Minute
completedTTL = 24 * time.Hour
)
// IdempotencyRecord defines the structure we store in Redis.
type IdempotencyRecord struct {
Status string `json:"status"`
Owner string `json:"owner"`
Timestamp time.Time `json:"timestamp"`
}
// Consumer holds our application state.
type Consumer struct {
kReader *kafka.Reader
rdb *redis.Client
consumerID string
acquireScript *redis.Script
completeScript *redis.Script
}
func NewConsumer() *Consumer {
// In a real app, these would come from config
kReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "my-idempotent-group",
Topic: "events",
})
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// Load Lua scripts
acquireLua, _ := os.ReadFile("acquire_lock.lua")
completeLua, _ := os.ReadFile("complete_lock.lua")
return &Consumer{
kReader: kReader,
rdb: rdb,
consumerID: uuid.New().String(), // Unique ID for this consumer instance
acquireScript: redis.NewScript(string(acquireLua)),
completeScript: redis.NewScript(string(completeLua)),
}
}
func (c *Consumer) Run(ctx context.Context) {
for {
msg, err := c.kReader.FetchMessage(ctx)
if err != nil {
fmt.Printf("could not fetch message: %v\n", err)
break
}
if err := c.handleMessage(ctx, msg); err != nil {
fmt.Printf("error handling message %s: %v. Not committing offset.\n", string(msg.Key), err)
// Do not commit, message will be redelivered.
} else {
if err := c.kReader.CommitMessages(ctx, msg); err != nil {
fmt.Printf("failed to commit message: %v\n", err)
}
}
}
}
func getEventIdFromMessage(msg kafka.Message) string {
for _, h := range msg.Headers {
if h.Key == "eventId" {
return string(h.Value)
}
}
return ""
}
func (c *Consumer) handleMessage(ctx context.Context, msg kafka.Message) error {
eventId := getEventIdFromMessage(msg)
if eventId == "" {
return fmt.Errorf("message missing eventId header")
}
idempotencyKey := fmt.Sprintf("idempotency:%s", eventId)
// 1. & 2. Check status and try to acquire lock
processingRecord := IdempotencyRecord{
Status: "PROCESSING",
Owner: c.consumerID,
Timestamp: time.Now(),
}
processingJSON, _ := json.Marshal(processingRecord)
// Using Lua script for atomic check-and-set
res, err := c.acquireScript.Run(ctx, c.rdb, []string{idempotencyKey}, string(processingJSON), lockTTL.Seconds()).Result()
if err != nil {
return fmt.Errorf("redis acquire lock script failed: %w", err)
}
if res != nil {
// Key already existed, inspect its state
existingVal, ok := res.(string)
if !ok {
return fmt.Errorf("unexpected type from redis: %T", res)
}
var existingRecord IdempotencyRecord
if err := json.Unmarshal([]byte(existingVal), &existingRecord); err != nil {
return fmt.Errorf("failed to unmarshal existing record: %w", err)
}
if existingRecord.Status == "COMPLETED" {
fmt.Printf("Event %s already completed. Skipping.\n", eventId)
return nil // Success, commit offset
}
if existingRecord.Status == "PROCESSING" {
// Another worker is processing or has died.
// In a real app, you might have more complex logic here (e.g., check timestamp)
return fmt.Errorf("event %s is already being processed by %s. Backing off", eventId, existingRecord.Owner)
}
}
fmt.Printf("Acquired lock for event %s\n", eventId)
// 3. Execute business logic
err = c.processBusinessLogic(ctx, msg)
if err != nil {
// On failure, release the lock so it can be retried.
fmt.Printf("Business logic failed for event %s. Releasing lock. Error: %v\n", eventId, err)
c.rdb.Del(ctx, idempotencyKey)
return fmt.Errorf("business logic failed: %w", err)
}
// 4. On success, mark as completed
completedRecord := IdempotencyRecord{
Status: "COMPLETED",
Owner: c.consumerID,
Timestamp: time.Now(),
}
completedJSON, _ := json.Marshal(completedRecord)
// Using Lua script for atomic conditional update
_, err = c.completeScript.Run(ctx, c.rdb, []string{idempotencyKey}, c.consumerID, string(completedJSON), completedTTL.Seconds()).Result()
if err != nil {
// This is a tricky state. Logic is done, but we can't mark it as complete.
// This could lead to reprocessing. Logging is critical here.
return fmt.Errorf("CRITICAL: failed to mark as completed after processing: %w", err)
}
fmt.Printf("Successfully processed and marked event %s as completed.\n", eventId)
return nil
}
func (c *Consumer) processBusinessLogic(ctx context.Context, msg kafka.Message) error {
fmt.Printf(" -> Processing business logic for key %s...\n", string(msg.Key))
// Simulate work
time.Sleep(500 * time.Millisecond)
// In a real scenario, this would be a database transaction.
// For example:
// tx, err := db.BeginTx(ctx, nil)
// ... write to multiple tables ...
// tx.Commit()
fmt.Printf(" -> Business logic complete for key %s.\n", string(msg.Key))
return nil
}
func main() {
consumer := NewConsumer()
fmt.Printf("Starting consumer instance %s\n", consumer.consumerID)
consumer.Run(context.Background())
}
Advanced Edge Cases and Performance Considerations
A production system must handle more than just the happy path.
1. Poison Pill Messages
What if a message is malformed or triggers a bug that causes processBusinessLogic to fail every time? Our current logic will create an infinite loop: acquire lock, fail, release lock, redeliver, repeat. This can overwhelm your system.
Solution: Implement a retry counter within the Redis record.
Modify the IdempotencyRecord:
type IdempotencyRecord struct {
Status string `json:"status"`
Owner string `json:"owner"`
Timestamp time.Time `json:"timestamp"`
RetryCount int `json:"retryCount"`
}
When acquiring the lock, if the key already exists with status PROCESSING, you can now check its retryCount. When business logic fails, instead of just deleting the key, you perform an atomic GET and SET (or another Lua script) to increment the retry count. If retryCount exceeds a threshold (e.g., 5), you change the status to FAILED, move the message to a DLQ, and commit the offset to stop the loop.
2. Redis as a Performance Bottleneck
In a high-throughput topic (e.g., >10,000 messages/sec), every message triggers multiple Redis commands. This can saturate a single Redis instance.
Benchmarking: A single r6g.large EC2 instance running Redis can handle roughly 100k-150k simple ops/sec. Our pattern uses at least 2 complex (Lua) operations per message. This gives a theoretical ceiling of ~50k messages/sec per Redis node*, not accounting for network latency or command complexity. This is a real-world limit to consider.
* Scaling: Use Redis Cluster to shard the idempotency keys across multiple nodes. The go-redis client supports this out of the box. Since each key is independent, this pattern scales horizontally very well.
* Pipelining: While our per-message logic is transactional, you can use pipelining at a higher level if you are batch-processing messages, though this complicates the idempotency logic significantly.
3. Lock Expiration and Zombie Processes
We set a TTL on the PROCESSING lock (lockTTL). What if the business logic legitimately takes longer than this TTL? The lock will expire, and another consumer may pick up the same message, leading to concurrent processing.
* Solution 1 (Simple): Tune the TTL. The easiest fix is to set a generous TTL that is well above your 99th percentile processing time (e.g., if most messages take 1 second, set the TTL to 5 minutes). This is a trade-off: a long TTL means a crashed consumer will hold a lock for longer, increasing message latency.
* Solution 2 (Complex): Heartbeating. The consumer processing the message can run a background goroutine that periodically updates the TTL of its PROCESSING lock (e.g., EXPIRE key 300). This adds significant complexity to your consumer logic (managing goroutines, handling context cancellation) but creates the most robust system for long-running jobs.
Alternative Approaches and Their Trade-offs
1. Database-Level Idempotency
You can use your primary transactional database (e.g., PostgreSQL) to store idempotency records.
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
The consumer logic becomes:
func handleWithDatabase(tx *sql.Tx, eventId string) error {
// This will fail with a primary key constraint violation if the eventId already exists.
_, err := tx.ExecContext(ctx, "INSERT INTO processed_events (event_id) VALUES ($1)", eventId)
if err != nil {
return err // Already processed or other DB error
}
// ... execute business logic within the same transaction ...
return nil // Commit happens outside this function
}
* Pros: Perfect atomicity. The check for the event and the business logic commit happen within a single database transaction. It's simpler to reason about.
* Cons:
* Performance: Puts significant write load on your primary database, which is often the most expensive and hardest-to-scale part of your infrastructure. Contention on the processed_events table can become a bottleneck.
* Separation of Concerns: Couples your message processing infrastructure logic with your business domain data.
* Not suitable for non-DB logic: If your "business logic" involves calling a third-party API, you cannot roll back that API call if the INSERT fails, breaking the atomicity guarantee.
2. Kafka Streams Exactly-Once Semantics (EOS)
Kafka Streams offers EOS, but it's crucial to understand its scope. EOS in Kafka works by using a transactional producer that coordinates with the consumer group coordinator. When a stream application reads from a source topic, processes the data, and writes to a sink topic, the entire operation (read-process-write) is committed as a single atomic transaction.
* When to use it: It is the perfect solution for Kafka-to-Kafka transformations (e.g., filtering, aggregation, enrichment where the result is another Kafka topic).
* Where it falls short: The moment you need to interact with an external system—like writing to a PostgreSQL database, calling a REST API, or sending an email—you break out of the Kafka transactional boundary. The write to the external system and the commit of the consumer offset are not atomic. You are back to an at-least-once scenario, and the Redis-based idempotency pattern described here becomes necessary once again.
Conclusion
For event-driven services that must interact with external systems, consumer-side idempotency is not an optional feature; it is a fundamental requirement for correctness. While naive approaches are simple to implement, they contain subtle but critical flaws that can lead to data loss or corruption in production.
The stateful idempotency pattern using Redis provides a robust, scalable, and performant solution. By treating message processing as a state machine (ACQUIRING, PROCESSING, COMPLETED) and leveraging atomic Redis operations via Lua scripting, we can build consumers that are resilient to crashes, retries, and concurrent execution. While it introduces an additional dependency (Redis) and requires careful handling of edge cases like poison pills and lock expiration, this pattern offers a powerful balance of safety and performance, making it a cornerstone of modern, high-throughput event-sourcing architectures.