Kafka Consumer Idempotency: The Redis and Outbox Pattern

16 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Idempotency Imperative in Asynchronous Systems

In distributed systems, Kafka's at-least-once delivery guarantee is a foundational contract. It ensures message durability but introduces a significant challenge for consumers: the potential for duplicate message processing. A network partition, a consumer crash post-processing but pre-commit, or a consumer group rebalance can all trigger redelivery of the same message. For many applications—payment processing, order fulfillment, inventory management—processing a duplicate message can lead to catastrophic data corruption.

The common goal is to achieve effectively-once processing semantics at the application layer. This means that while the message may be delivered more than once, our application logic ensures it is processed and its side effects are applied exactly once.

A naive approach might involve using a unique constraint in your primary database on a transaction ID. However, this pattern quickly breaks down when a single message consumption triggers multiple, non-atomic operations: updating a database row, making an external API call, and producing a new event. If a failure occurs between these steps, the system is left in an inconsistent state. A truly robust solution must guarantee both idempotency of the initial consumption and atomicity of its resulting side effects.

This article details a battle-tested, production-grade pattern that combines the low-latency capabilities of Redis for idempotency checks with the transactional consistency of the Outbox pattern. We will architect a consumer that can withstand crashes and retries at any point in its lifecycle while guaranteeing data integrity.

Core Pattern: The Stateful Idempotency Key with Redis

A simple idempotency check might use Redis's SETNX (SET if Not eXists) command. The consumer would extract a unique ID from the incoming message (e.g., event_id), attempt to SETNX this key in Redis, and only proceed if the command returns 1 (indicating the key was newly set).

However, this simple approach has a critical flaw: the crash-recovery race condition. Consider this sequence:

  • Consumer receives message M1 with key K1.
  • Successfully executes SETNX K1 1 in Redis.
  • The process crashes before completing its business logic and committing the Kafka offset.
  • After rebalancing, another consumer instance receives M1 again.
  • It attempts SETNX K1 1, which now returns 0 because the key exists.
  • The consumer incorrectly assumes M1 was fully processed, skips it, and commits the offset.
  • The business logic was never completed, and the message is now lost forever. To solve this, we must evolve our idempotency record from a simple flag to a state machine.

    A Multi-Stage Idempotency Record

    We'll store a JSON object or a Redis Hash for each idempotency key, tracking the processing state. The states are:

    * PROCESSING: The message is currently being handled. This acts as a lock.

    * COMPLETED: The message was successfully and completely processed.

    * FAILED: The message processing failed due to a transient error and can be retried.

    Here's the refined consumer logic:

  • Extract the idempotency key K from the message.
  • Check the state of K in Redis.
  • * If COMPLETED, the message is a duplicate of a successfully processed one. Acknowledge and skip.

    * If PROCESSING, another consumer might be working on it. This could indicate a long-running process or a zombie consumer. We can choose to wait, retry, or raise an alert after a timeout.

    * If FAILED or non-existent, proceed to process.

  • Atomically set the state of K to PROCESSING with a reasonable TTL (e.g., 5 minutes). This TTL prevents permanent locks if a consumer dies without cleaning up.
    • Execute business logic.
  • Upon successful completion, update the state of K to COMPLETED with a longer TTL (e.g., 24 hours) to cover the redelivery window.
  • If a recoverable error occurs, update the state to FAILED or simply delete the key to allow a full retry on redelivery.
    • Commit the Kafka offset.

    Go Implementation: Stateful Idempotency Consumer

    Let's implement this logic in Go using the go-redis library. This example assumes you have a Kafka consumer loop set up.

    go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    	"github.com/segmentio/kafka-go"
    )
    
    // IdempotencyRecord defines the state stored in Redis.
    type IdempotencyRecord struct {
    	Status    string `json:"status"`
    	Timestamp int64  `json:"timestamp"`
    }
    
    const (
    	StatusProcessing = "PROCESSING"
    	StatusCompleted  = "COMPLETED"
    )
    
    const (
    	processingTTL = 5 * time.Minute
    	completedTTL  = 24 * time.Hour
    )
    
    // IdempotencyService manages the idempotency checks against Redis.
    type IdempotencyService struct {
    	redisClient *redis.Client
    }
    
    func NewIdempotencyService(client *redis.Client) *IdempotencyService {
    	return &IdempotencyService{redisClient: client}
    }
    
    // CheckAndSetProcessing checks the status and sets it to PROCESSING if permissible.
    func (s *IdempotencyService) CheckAndSetProcessing(ctx context.Context, key string) (bool, error) {
    	val, err := s.redisClient.Get(ctx, key).Result()
    	if err != nil && err != redis.Nil {
    		return false, fmt.Errorf("failed to get key from redis: %w", err)
    	}
    
    	if err == nil {
    		var record IdempotencyRecord
    		if jsonErr := json.Unmarshal([]byte(val), &record); jsonErr != nil {
    			// Corrupted data, treat as processable but log a warning
    			fmt.Printf("WARN: Corrupted idempotency record for key %s: %v\n", key, jsonErr)
    		} else {
    			if record.Status == StatusCompleted {
    				fmt.Printf("INFO: Duplicate message detected, key %s already completed.\n", key)
    				return false, nil // Do not process
    			}
    			if record.Status == StatusProcessing {
    				// Check timestamp to see if it's a stale lock
    				if time.Now().Unix()-record.Timestamp < int64(processingTTL.Seconds()) {
    					fmt.Printf("WARN: Message with key %s is already processing.\n", key)
    					return false, nil // Do not process, maybe another consumer is busy
    				}
    				fmt.Printf("INFO: Stale processing lock for key %s. Re-processing.\n", key)
    			}
    		}
    	}
    
    	// Set to PROCESSING
    	newRecord := IdempotencyRecord{
    		Status:    StatusProcessing,
    		Timestamp: time.Now().Unix(),
    	}
    	recordJSON, _ := json.Marshal(newRecord)
    
    	// Using SET with NX option to be safe, though our logic mostly prevents races.
    	// This is not perfectly atomic with the GET, for true atomicity a LUA script is best.
    	// However, for this pattern, the small race window is often acceptable.
    	_, err = s.redisClient.Set(ctx, key, recordJSON, processingTTL).Result()
    	if err != nil {
    		return false, fmt.Errorf("failed to set processing status in redis: %w", err)
    	}
    
    	return true, nil // Ok to process
    }
    
    // SetCompleted marks a key as successfully processed.
    func (s *IdempotencyService) SetCompleted(ctx context.Context, key string) error {
    	completedRecord := IdempotencyRecord{
    		Status:    StatusCompleted,
    		Timestamp: time.Now().Unix(),
    	}
    	recordJSON, _ := json.Marshal(completedRecord)
    
    	err := s.redisClient.Set(ctx, key, recordJSON, completedTTL).Err()
    	if err != nil {
    		return fmt.Errorf("failed to set completed status: %w", err)
    	}
    	return nil
    }
    
    // processMessage is a placeholder for your main consumer logic
    func processMessage(ctx context.Context, msg kafka.Message, idemSvc *IdempotencyService) error {
        idempotencyKey := string(msg.Key) // Assuming event ID is in the Kafka message key
    
        canProcess, err := idemSvc.CheckAndSetProcessing(ctx, idempotencyKey)
        if err != nil {
            // Error communicating with Redis, bubble up to trigger a retry
            return fmt.Errorf("idempotency check failed: %w", err)
        }
    
        if !canProcess {
            // Message is a duplicate or is being processed elsewhere
            return nil // Acknowledge without processing
        }
    
        // --- BEGIN BUSINESS LOGIC --- 
        fmt.Printf("Processing message: %s\n", idempotencyKey)
        // Simulate work
        time.Sleep(2 * time.Second)
        // --- END BUSINESS LOGIC --- 
    
        // Mark as completed
        if err := idemSvc.SetCompleted(ctx, idempotencyKey); err != nil {
            // This is a critical failure. If this fails, on redelivery we will re-process.
            // The system must be designed to handle this possibility, potentially with out-of-band cleanup.
            return fmt.Errorf("failed to mark message as completed: %w", err)
        }
    
        fmt.Printf("Successfully processed and marked as completed: %s\n", idempotencyKey)
        return nil
    }
    

    Note on Atomicity: The GET followed by a SET in CheckAndSetProcessing is not atomic. A small race condition exists where two consumers could read a non-existent key simultaneously and both proceed to set it to PROCESSING. The second write will overwrite the first, but both consumers will start processing. While often acceptable if the business logic itself has further guards, true atomicity requires a Lua script executed on the Redis server.

    lua
    -- LUA Script for atomic check-and-set
    local key = KEYS[1]
    local processing_ttl_seconds = ARGV[1]
    local current_ts = ARGV[2]
    local processing_payload = ARGV[3] -- e.g., '{"status":"PROCESSING", ...}'
    local completed_status = ARGV[4] -- e.g., 'COMPLETED'
    
    local value = redis.call('GET', key)
    
    if value then
        -- Using cjson library which is standard in Redis
        local record = cjson.decode(value)
        if record.status == completed_status then
            return 'COMPLETED'
        else
            -- Could add more logic for stale PROCESSING check here
            return 'LOCKED'
        end
    end
    
    -- Key does not exist, set to PROCESSING
    redis.call('SET', key, processing_payload, 'EX', processing_ttl_seconds)
    return 'OK'

    The Atomicity Challenge: The Transactional Outbox Pattern

    The Redis idempotency layer solves the duplicate processing problem for the consumer's logic. But what if that logic needs to both update a local database and produce a new, derivative event? This is the classic dual-write problem.

    Example:

    A PaymentProcessed event is consumed. The consumer must:

  • Update the orders table in a PostgreSQL database to status = 'PAID'.
  • Produce a new OrderReadyForShipment event to another Kafka topic.
  • If the process crashes after the database commit but before the Kafka produce call returns successfully, the order is marked as paid, but the shipping department is never notified. The system is now inconsistent.

    The Transactional Outbox pattern solves this by leveraging the atomicity of the local database transaction.

    The Pattern:

    • The consumer begins a database transaction.
    • It performs all its required database updates (e.g., updating the order status).
  • Instead of directly producing to Kafka, it inserts a record representing the outbound event into an outbox table within the same database and same transaction.
    • It commits the database transaction.

    Now, the state change and the intent to send the event are committed atomically. If the commit succeeds, both are saved; if it fails, both are rolled back. The dual-write problem is eliminated.

    Outbox Table Schema

    A typical outbox table in PostgreSQL might look like this:

    sql
    CREATE TABLE outbox (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        aggregate_type VARCHAR(255) NOT NULL,
        aggregate_id VARCHAR(255) NOT NULL,
        event_type VARCHAR(255) NOT NULL,
        payload JSONB NOT NULL,
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        published BOOLEAN NOT NULL DEFAULT FALSE,
        published_at TIMESTAMPTZ
    );
    
    -- Critical index for the poller/CDC connector
    CREATE INDEX idx_outbox_unpublished ON outbox (created_at) WHERE published = FALSE;

    The Outbox Processor: Debezium vs. Polling

    With the event safely in the outbox, a separate process is needed to relay it to Kafka. There are two primary strategies.

    Approach A: Change Data Capture (CDC) with Debezium

    This is the most robust and performant approach. Debezium is a platform for CDC that tails the database's transaction log (e.g., PostgreSQL's Write-Ahead Log or WAL).

    How it works:

  • You deploy a Debezium connector for your database (e.g., debezium-connector-postgres) via Kafka Connect.
  • You configure the connector to monitor the outbox table.
  • When a transaction that inserts a new row into outbox is committed, Debezium reads this event from the WAL.
    • It transforms the row data into a structured event and publishes it to a specified Kafka topic.

    Pros:

    * Low Latency: Reads directly from the transaction log, making it near real-time.

    * High Throughput: Extremely efficient and doesn't load the primary database with queries.

    * Guaranteed Delivery: Debezium is built on Kafka Connect, providing fault tolerance and delivery guarantees.

    * Decoupling: The application logic is completely unaware of the publishing mechanism.

    Cons:

    * Operational Complexity: Requires running and maintaining a Kafka Connect cluster, which is non-trivial.

    * Configuration: Setting up connectors requires careful configuration of schemas, transforms, and error handling.

    Approach B: Application-Level Polling

    This is a simpler, infrastructure-light alternative. A background process within your application or a separate small service periodically queries the outbox table for unpublished events.

    How it works:

    • A poller runs on a fixed interval (e.g., every 100ms).
  • It executes a query: SELECT * FROM outbox WHERE published = FALSE ORDER BY created_at LIMIT 100;
    • For each retrieved row, it produces the event to Kafka.
  • Upon successful production acknowledgement from Kafka, it updates the row: UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = ?;
  • Pros:

    * Simplicity: No extra infrastructure beyond the application itself.

    * Easy to Implement: The logic is straightforward application code.

    Cons:

    * Higher Latency: The polling interval introduces a delay.

    * Database Load: Constant polling adds read/write load to your primary database.

    * Scalability Concerns: Can be difficult to scale the poller horizontally without introducing race conditions where multiple instances try to process the same outbox items. This requires locking mechanisms (e.g., SELECT ... FOR UPDATE SKIP LOCKED).

    Code Example: A Simple Go Outbox Poller

    go
    package main
    
    import (
    	"context"
    	"database/sql"
    	"fmt"
    	"time"
    
    	_ "github.com/lib/pq"
    	"github.com/segmentio/kafka-go"
    )
    
    type OutboxEvent struct {
    	ID           string
    	AggregateID  string
    	EventType    string
    	Payload      []byte // JSONB as raw bytes
    }
    
    func PollAndPublish(ctx context.Context, db *sql.DB, writer *kafka.Writer) {
    	ticker := time.NewTicker(500 * time.Millisecond)
    	defer ticker.Stop()
    
    	for {
    		select {
    		case <-ctx.Done():
    			return
    		case <-ticker.C:
    			events, err := fetchUnpublishedEvents(ctx, db, 100)
    			if err != nil {
    				fmt.Printf("ERROR: Failed to fetch from outbox: %v\n", err)
    				continue
    			}
    
    			for _, event := range events {
    				msg := kafka.Message{
    					Key:   []byte(event.AggregateID),
    					Value: event.Payload,
    					Headers: []kafka.Header{{Key: "eventType", Value: []byte(event.EventType)}},
    				}
    
    				if err := writer.WriteMessages(ctx, msg); err != nil {
    					fmt.Printf("ERROR: Failed to publish outbox event %s to Kafka: %v\n", event.ID, err)
    					// Break and retry the whole batch later
    					break
    				}
    
    				if err := markEventAsPublished(ctx, db, event.ID); err != nil {
    					fmt.Printf("CRITICAL: Published event %s but failed to mark as published: %v\n", event.ID, err)
                        // This can lead to duplicate event publishing. Needs alerting.
    					break
    				}
    			}
    		}
    		}
    }
    
    func fetchUnpublishedEvents(ctx context.Context, db *sql.DB, limit int) ([]OutboxEvent, error) {
        // In a real multi-instance scenario, you MUST use row-level locking
        // SELECT ... FOR UPDATE SKIP LOCKED is the standard for this pattern
    	rows, err := db.QueryContext(ctx, 
            `SELECT id, aggregate_id, event_type, payload FROM outbox WHERE published = FALSE ORDER BY created_at LIMIT $1 FOR UPDATE SKIP LOCKED`, 
            limit)
    	if err != nil {
    		return nil, err
    	}
    	defer rows.Close()
    
    	var events []OutboxEvent
    	for rows.Next() {
    		var e OutboxEvent
    		if err := rows.Scan(&e.ID, &e.AggregateID, &e.EventType, &e.Payload); err != nil {
    			return nil, err
    		}
    		events = append(events, e)
    	}
    	return events, nil
    }
    
    func markEventAsPublished(ctx context.Context, db *sql.DB, id string) error {
    	_, err := db.ExecContext(ctx, "UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = $1", id)
    	return err
    }

    Tying It All Together: The End-to-End Flow

    Let's integrate the Redis idempotency check with the Transactional Outbox pattern for a fully resilient consumer.

    mermaid
    sequenceDiagram
        participant C as Kafka Consumer
        participant R as Redis
        participant DB as PostgreSQL
        participant P as Outbox Processor
        participant K as Kafka
    
        K->>C: Delivers Message M1 (EventID: E1)
        C->>R: GET idempotency key E1
        R-->>C: Key not found
        C->>R: SET E1 {status: "PROCESSING"} TTL 5m
        R-->>C: OK
    
        C->>DB: BEGIN TRANSACTION
        C->>DB: UPDATE orders SET status = 'PAID' WHERE ...
        C->>DB: INSERT INTO outbox (aggregate_id, event_type, payload) VALUES (...)
        C->>DB: COMMIT
    
        alt Commit Successful
            DB-->>C: COMMIT OK
            C->>R: SET E1 {status: "COMPLETED"} TTL 24h
            R-->>C: OK
            C->>K: Commit Kafka Offset
    
            P->>DB: SELECT * FROM outbox WHERE published=FALSE
            DB-->>P: Returns Outbox Row for M1's derivative event
            P->>K: Produce new Message M2
            K-->>P: Ack M2
            P->>DB: UPDATE outbox SET published=TRUE WHERE id=...
        else Commit Fails
            DB-->>C: ROLLBACK / Error
            C->>R: DEL E1 (or let it expire)
            C-xK: Do NOT commit offset (message will be redelivered)
        end

    This combined pattern ensures:

  • Idempotent Consumption: The Redis check prevents re-processing of already completed messages.
  • Atomic State Change: The database transaction guarantees that business state and the outbound event are committed together or not at all.
  • Resilient Publishing: The outbox processor ensures the outbound event will eventually be sent to Kafka, even if the original consumer process crashes.
  • Edge Cases and Production Considerations

    * Poison Pill Messages: A message that consistently causes a failure in the business logic (e.g., due to malformed data) will be redelivered indefinitely. Your consumer must have a retry mechanism with a backoff policy and, after a certain number of failures, move the message to a Dead-Letter Queue (DLQ). The idempotency record in Redis should be updated to FAILED and perhaps include a retry count.

    * Redis Unavailability: If Redis is down, the idempotency check fails. The consumer should fail open (risk duplicate processing with alerting) or fail closed (stop processing altogether), depending on business requirements. A fail-closed approach is safer. Implement circuit breakers around Redis calls.

    Outbox Poller Failure: What if the poller publishes to Kafka but crashes before marking the outbox event as published? On restart, it will republish the same event. The consumers of that* topic must also be idempotent, demonstrating that idempotency is a required property of services throughout an event-driven ecosystem.

    * Database Contention: The SELECT ... FOR UPDATE SKIP LOCKED pattern is crucial for scaling the poller, but it can still cause contention. Ensure the index on (published, created_at) is in place. For very high-throughput systems, consider partitioning the outbox table.

    * Cleanup: Both the Redis keys and the outbox table will grow indefinitely. A periodic cleanup job is necessary. For Redis, the TTL handles this automatically. For the outbox table, a background process should archive or delete rows where published = TRUE and published_at is older than a specified retention period (e.g., 7 days).

    Conclusion

    Achieving 'effectively-once' semantics in a distributed system like Kafka is not a feature of the broker, but a responsibility of the application. By moving beyond simplistic checks and implementing a robust, two-part strategy—stateful idempotency records in Redis for fast duplicate detection and the Transactional Outbox pattern for atomic state changes—we can build highly resilient and consistent services. This architecture, while complex, provides the necessary guarantees for mission-critical systems where data integrity is paramount. It correctly handles process crashes, network failures, and message redeliveries, ensuring that each event leaves its mark on the system exactly once.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles