Idempotency Patterns for Kafka-based Event-Driven Microservices

14 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitability of Duplicates in Asynchronous Systems

In any mature, distributed, event-driven architecture, the problem of message duplication is not a matter of if, but when. While platforms like Apache Kafka provide powerful delivery guarantees, the default and most common configuration is at-least-once delivery. This guarantee is a pragmatic trade-off, ensuring no data is lost during broker or consumer failures, at the cost of potentially redelivering messages.

Why not exactly-once? While Kafka now offers exactly-once semantics (EOS) through its transactional APIs, achieving it end-to-end (from producer to consumer processing to final output) requires a holistic and often complex implementation. More critically, the most common point of failure that introduces duplicates is within the consumer's own logic. Consider this canonical failure scenario:

  • A consumer fetches a message (e.g., process_payment with transaction_id: 123).
    • It successfully processes the business logic (the payment is debited in the database).
    • It prepares to commit the offset to Kafka to signal completion.
  • The consumer process crashes or is forcefully redeployed before the offset commit is acknowledged by the Kafka broker.
  • Upon restart, the new consumer instance will fetch messages from the last committed offset, re-receiving the process_payment message for transaction_id: 123. Without a robust idempotency strategy, it will process the payment a second time, resulting in a double charge—a catastrophic business failure.

    This is why application-level idempotency is non-negotiable for any critical operation in an event-driven system. An idempotent operation is one that can be applied multiple times without changing the result beyond the initial application. Our goal as senior engineers is to build consumers that are inherently idempotent, transforming at-least-once delivery into effectively-exactly-once processing.

    This article dissects two production-grade patterns for achieving this, with complete Go implementations: a high-throughput Redis-based approach and a transactionally-consistent PostgreSQL-based approach, culminating in the Transactional Outbox pattern for ultimate durability.


    Pattern 1: High-Throughput Idempotency with Redis

    For systems where processing latency is critical and a minimal risk of data loss on the idempotency store is acceptable (e.g., view counters, non-financial aggregations), Redis is an exceptional choice. Its atomic, single-threaded operations like SETNX (Set If Not Exists) provide a highly performant distributed lock and state-tracking mechanism.

    The Core Logic

    The pattern is straightforward:

  • Extract a unique Idempotency Key from each Kafka message. This could be a UUID in the message header or a natural key from the payload (e.g., order_id, payment_id).
  • Before processing, attempt to claim this key in Redis using SETNX.
    • If the key is successfully claimed, the consumer proceeds with the business logic.
    • Upon completion, the consumer updates the key in Redis to store the result and sets a Time-To-Live (TTL) to prevent the store from growing indefinitely.
    • If the key claim fails, it means another consumer is processing it or has already completed it. The consumer then reads the key's value to determine the status and act accordingly.

    Go Implementation with Redis

    Let's build a consumer that processes a simple OrderCreated event. We'll use the redis/go-redis and confluentinc/confluent-kafka-go libraries.

    Data Structures and Constants:

    go
    package main
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
    	"github.com/google/uuid"
    	"github.com/redis/go-redis/v9"
    )
    
    const (
    	IdempotencyKeyHeader = "idempotency-key"
    	RedisKeyPrefix       = "idempotency:"
    	KeyTTL               = 24 * time.Hour
    )
    
    type OrderEvent struct {
    	OrderID string  `json:"order_id"`
    	Amount  float64 `json:"amount"`
    }
    
    type ProcessingStatus string
    
    const (
    	StatusPending   ProcessingStatus = "PENDING"
    	StatusCompleted ProcessingStatus = "COMPLETED"
    )
    
    type StoredResult struct {
    	Status         ProcessingStatus `json:"status"`
    	Response       string           `json:"response"`
    	ProcessingNode string           `json:"processing_node"`
    }

    The Consumer Logic:

    Here's the core of the consumer. Pay close attention to the handling of the SETNX result and the multi-step state management within Redis.

    go
    // Assume rdb is an initialized *redis.Client
    // Assume consumer is an initialized *kafka.Consumer
    
    func handleMessageWithRedis(ctx context.Context, rdb *redis.Client, msg *kafka.Message) error {
    	var idempotencyKey string
    	for _, h := range msg.Headers {
    		if h.Key == IdempotencyKeyHeader {
    			idempotencyKey = string(h.Value)
    			break
    		}
    	}
    
    	if idempotencyKey == "" {
    		fmt.Println("ERROR: Message missing idempotency key")
    		return nil // Acknowledge and move on, or move to DLQ
    	}
    
    	redisKey := RedisKeyPrefix + idempotencyKey
    
    	// 1. Attempt to claim the key
    	pendingState := StoredResult{Status: StatusPending, ProcessingNode: "node-1"} // node-1 is a placeholder
    	pendingJSON, _ := json.Marshal(pendingState)
    
    	claimed, err := rdb.SetNX(ctx, redisKey, pendingJSON, 1*time.Minute).Result() // Short TTL for pending state
    	if err != nil {
    		return fmt.Errorf("redis SETNX failed: %w", err)
    	}
    
    	if claimed {
    		// 2. We got the lock! Process the message.
    		fmt.Printf("Claimed key %s. Processing message...\n", idempotencyKey)
    
    		var event OrderEvent
    		if err := json.Unmarshal(msg.Value, &event); err != nil {
    			// Handle poison pill message
    			// Potentially delete the key to allow for retries if it's a transient error
    			rdb.Del(ctx, redisKey)
    			return fmt.Errorf("failed to unmarshal event: %w", err)
    		}
    
    		// --- BUSINESS LOGIC --- 
    		// (e.g., save to DB, call another service)
    		processingResult := fmt.Sprintf("Processed order %s for amount %.2f", event.OrderID, event.Amount)
    		time.Sleep(2 * time.Second) // Simulate work
    		// --- END BUSINESS LOGIC ---
    
    		// 3. Store the final result and set the final TTL
    		finalState := StoredResult{Status: StatusCompleted, Response: processingResult}
    		finalJSON, _ := json.Marshal(finalState)
    		
    		if err := rdb.Set(ctx, redisKey, finalJSON, KeyTTL).Err(); err != nil {
    			// This is a critical failure. The work is done but we can't mark it as such.
    			// This could lead to a retry that re-processes. Requires manual intervention or a more robust pattern.
    			return fmt.Errorf("CRITICAL: failed to set final state in redis: %w", err)
    		}
    
    		fmt.Printf("Finished processing for key %s.\n", idempotencyKey)
    		return nil
    
    	} else {
    		// 4. Key already exists. Check its status.
    		fmt.Printf("Key %s already exists. Checking status...\n", idempotencyKey)
    		for i := 0; i < 5; i++ { // Retry loop for pending status
    			val, err := rdb.Get(ctx, redisKey).Result()
    			if err == redis.Nil {
    				// Key expired or was deleted between SETNX and GET. Retry the whole process.
    				fmt.Println("Key disappeared. Retrying message handling.")
    				return handleMessageWithRedis(ctx, rdb, msg) // Recursive call, be careful with stack depth
    			}
    			if err != nil {
    				return fmt.Errorf("redis GET failed: %w", err)
    			}
    
    			var stored StoredResult
    			if err := json.Unmarshal([]byte(val), &stored); err != nil {
    				return fmt.Errorf("failed to unmarshal stored result: %w", err)
    			}
    
    			if stored.Status == StatusCompleted {
    				fmt.Printf("Message with key %s already processed. Response: '%s'. Skipping.\n", idempotencyKey, stored.Response)
    				return nil // Success, already done.
    			} 
    
    			if stored.Status == StatusPending {
    				fmt.Printf("Key %s is pending by another process. Waiting... (attempt %d)\n", idempotencyKey, i+1)
    				time.Sleep(500 * time.Millisecond)
    				continue
    			}
    		}
    		return fmt.Errorf("key %s stuck in pending state", idempotencyKey)
    	}
    }

    Performance and Edge Cases

    * Performance: This approach is incredibly fast. A SETNX operation in Redis typically completes in under a millisecond. It's ideal for high-throughput topics where tens of thousands of messages per second are being processed across a fleet of consumers.

    * Race Conditions: The PENDING state with a short TTL helps mitigate race conditions where a consumer claims a key but crashes before completing the work. The short TTL ensures the lock is eventually released. The retry loop in the else block handles the case where one consumer encounters a key that is actively being processed by another.

    * Critical Failure Point: The most significant weakness is the non-atomic nature of Business Logic -> Redis SET. If your business logic commits a database transaction but the consumer crashes before the final rdb.Set() call, the idempotency key will be left in a PENDING state until its short TTL expires. When another consumer picks it up, it will re-process the already-committed work. This is often an acceptable risk for non-critical tasks but a deal-breaker for financial transactions.

    * Redis Failure: If the Redis cluster goes down, your entire processing pipeline halts. If it suffers data loss (e.g., a failed failover where some writes are lost), your idempotency guarantees are voided for the lost keys.


    Pattern 2: Transactional Integrity with PostgreSQL

    For systems that demand absolute correctness and durability, such as financial ledgers, order processing, or inventory management, we must tie the idempotency check to the same atomic transaction as the business logic. A relational database like PostgreSQL is the perfect tool for this.

    The Core Logic

    This pattern leverages the ACID properties of a relational database.

  • Create a dedicated idempotency_keys table in your database.
    • For each incoming message, the consumer starts a database transaction.
  • Inside the transaction, it attempts to INSERT the message's idempotency key into the table.
  • If the INSERT succeeds, it proceeds with the business logic (e.g., UPDATEing other tables) within the same transaction.
  • If the INSERT fails due to a unique constraint violation, the key already exists. The consumer then SELECTs the existing row to check its status.
  • Crucially, to handle concurrent consumers processing the same duplicate message, the SELECT must use a pessimistic lock (FOR UPDATE) to prevent race conditions.
    • Finally, the entire transaction is committed. Atomicity ensures that the business logic and the idempotency key record are saved together, or not at all.

    Go Implementation with PostgreSQL

    Let's refactor our consumer to use pgx with PostgreSQL.

    Database Schema:

    sql
    CREATE TABLE idempotency_keys (
        key VARCHAR(255) PRIMARY KEY,
        status VARCHAR(50) NOT NULL,
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        -- Store the response so we can return it on duplicate calls
        response_payload JSONB,
        -- To prevent keys from being stuck in PENDING forever
        last_updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    
    CREATE INDEX idx_idempotency_keys_created_at ON idempotency_keys(created_at);

    The Consumer Logic:

    This implementation is more complex, but provides far stronger guarantees.

    go
    // Assume db is an initialized *pgxpool.Pool
    
    func handleMessageWithPostgres(ctx context.Context, db *pgxpool.Pool, msg *kafka.Message) error {
    	var idempotencyKey string
    	// ... (extract idempotencyKey as before)
    
    	tx, err := db.Begin(ctx)
    	if err != nil {
    		return fmt.Errorf("failed to begin transaction: %w", err)
    	}
    	defer tx.Rollback(ctx) // Rollback is a no-op if tx is committed
    
    	// 1. Try to insert the key. ON CONFLICT DO NOTHING is key.
    	// This is an atomic check-and-set.
    	insertCmd, err := tx.Exec(ctx, 
    		`INSERT INTO idempotency_keys (key, status) VALUES ($1, 'PENDING') ON CONFLICT (key) DO NOTHING`,
    		idempotencyKey,
    	)
    	if err != nil {
    		return fmt.Errorf("failed to insert idempotency key: %w", err)
    	}
    
    	if insertCmd.RowsAffected() > 0 {
    		// 2. We are the first. The key is now ours within this transaction.
    		fmt.Printf("Claimed key %s. Processing message...\n", idempotencyKey)
    
    		var event OrderEvent
    		// ... (unmarshal event as before)
    
    		// --- BUSINESS LOGIC (within the same transaction) ---
    		processingResult := fmt.Sprintf("Processed order %s for amount %.2f", event.OrderID, event.Amount)
    		// Example: tx.Exec(ctx, "UPDATE orders SET status='PROCESSED' WHERE id=$1", event.OrderID)
    		time.Sleep(2 * time.Second) // Simulate work
    		// --- END BUSINESS LOGIC ---
    
    		resultPayload, _ := json.Marshal(map[string]string{"result": processingResult})
    
    		// 3. Update the key to COMPLETED with the result
    		_, err = tx.Exec(ctx, 
    			`UPDATE idempotency_keys SET status='COMPLETED', response_payload=$1, last_updated_at=NOW() WHERE key=$2`,
    			resultPayload,
    			idempotencyKey,
    		)
    		if err != nil {
    			return fmt.Errorf("failed to update idempotency key to completed: %w", err)
    		}
    
    	} else {
    		// 4. Key already exists. We need to check its status.
    		// Use SELECT ... FOR UPDATE to acquire a row-level lock.
    		// This blocks other concurrent consumers trying to process the same key, preventing race conditions.
    		fmt.Printf("Key %s already exists. Locking row and checking status...\n", idempotencyKey)
    
    		var status string
    		var responsePayload []byte
    		err = tx.QueryRow(ctx, 
    			`SELECT status, response_payload FROM idempotency_keys WHERE key=$1 FOR UPDATE`,
    			idempotencyKey,
    		).Scan(&status, &responsePayload)
    
    		if err != nil {
    			return fmt.Errorf("failed to select and lock idempotency key: %w", err)
    		}
    
    		if status == "COMPLETED" {
    			fmt.Printf("Message with key %s already processed. Skipping. Response: %s\n", idempotencyKey, string(responsePayload))
    			// We can commit the transaction here, as it was read-only and we're done.
    			return tx.Commit(ctx)
    		} 
    
    		if status == "PENDING" {
    			// Another consumer crashed. Or the transaction is just very long.
    			// Here you need a business rule. E.g., if last_updated_at > 10 mins ago, take over.
    			// For simplicity, we will just abort.
    			return fmt.Errorf("key %s is already pending by another transaction", idempotencyKey)
    		}
    	}
    
    	// 5. Commit the entire transaction
    	fmt.Printf("Committing transaction for key %s\n", idempotencyKey)
    	return tx.Commit(ctx)
    }

    Durability and Trade-offs

    * Atomicity: This is the killer feature. The business logic and the idempotency state change are committed in a single atomic unit. The failure mode from the Redis example (work done, state not saved) is impossible.

    * Performance: The cost is latency. Database transactions, especially those involving row-level locks (FOR UPDATE), are orders of magnitude slower than a Redis SETNX. This might not be suitable for topics requiring extremely low latency, but it's perfect for most transactional business processes.

    * Database Load: This pattern increases the write load on your database. The idempotency_keys table will see at least one INSERT or UPDATE for every single message processed. Ensure your database is provisioned to handle this load and that you have a cleanup strategy for old keys.

    * Deadlocks: SELECT ... FOR UPDATE can introduce the risk of deadlocks if transactions acquire locks in different orders. Ensure your access patterns are consistent. In this specific idempotency pattern, since we are always locking a single key, deadlocks are unlikely but not impossible in more complex transaction scenarios.


    Advanced Pattern: The Transactional Outbox for Flawless Atomicity

    We've solved the atomicity of business logic and idempotency state. But we still haven't solved the original failure scenario perfectly: what if our transaction commits successfully, but the consumer crashes before committing the Kafka offset?

    This is where the Transactional Outbox pattern comes in. It extends the database-backed approach to achieve true end-to-end atomicity.

    The Core Logic

    The pattern is an enhancement of the PostgreSQL implementation:

  • Create a processed_kafka_offsets table in the same database.
  • Inside the same single database transaction where you perform business logic and manage the idempotency key, you also INSERT or UPDATE the Kafka offset (topic, partition, offset) into this new table.
  • Because this is all in one transaction, we can now guarantee this invariant: If the business logic is committed, the Kafka offset for that message is also durably stored in our database.

    How does this help? When a consumer starts up, its first action is not to ask Kafka for the last committed offset. Instead, it queries its own processed_kafka_offsets table to find the latest offset it has successfully processed in a transaction. It then uses Kafka's Seek() functionality to begin consuming from that precise point, completely ignoring whatever stale offset Kafka might have stored.

    Implementation Sketch

    Additional Schema:

    sql
    CREATE TABLE processed_kafka_offsets (
        topic VARCHAR(255) NOT NULL,
        partition INT NOT NULL,
        last_processed_offset BIGINT NOT NULL,
        PRIMARY KEY (topic, partition)
    );

    Modified Consumer Logic:

    go
    // Inside handleMessageWithPostgres, within the transaction...
    
    // ... after successfully processing business logic and updating idempotency key ...
    
    // 6. Store the Kafka offset in the same transaction
    _, err = tx.Exec(ctx, 
    	`INSERT INTO processed_kafka_offsets (topic, partition, last_processed_offset)
    	 VALUES ($1, $2, $3)
    	 ON CONFLICT (topic, partition) DO UPDATE
    	 SET last_processed_offset = EXCLUDED.last_processed_offset`,
    	msg.TopicPartition.Topic,
    	msg.TopicPartition.Partition,
    	msg.TopicPartition.Offset,
    )
    if err != nil {
    	return fmt.Errorf("failed to save kafka offset: %w", err)
    }
    
    // 7. Commit transaction
    return tx.Commit(ctx)

    Consumer Startup Logic:

    go
    // Before the main consumer loop starts...
    func seekToLastProcessedOffset(ctx context.Context, db *pgxpool.Pool, consumer *kafka.Consumer, partitions []kafka.TopicPartition) {
    	for _, p := range partitions {
    		var lastOffset int64
    		err := db.QueryRow(ctx, 
    			`SELECT last_processed_offset FROM processed_kafka_offsets WHERE topic=$1 AND partition=$2`,
    			*p.Topic,
    			p.Partition,
    		).Scan(&lastOffset)
    
    		if err == nil {
    			// We have a stored offset, seek to the *next* message
    			seekPartition := kafka.TopicPartition{Topic: p.Topic, Partition: p.Partition, Offset: kafka.Offset(lastOffset + 1)}
    			fmt.Printf("Seeking partition %v to stored offset %d\n", p, lastOffset+1)
    			consumer.Seek(seekPartition, 0)
    		} else {
    			// No stored offset, let the consumer use its default committed offset
    			fmt.Printf("No stored offset for partition %v, using default\n", p)
    		}
    	}
    }

    This closes the final gap. The system is now resilient to consumer crashes at any point. A committed database transaction becomes the single source of truth for both the business state and the message processing progress.

    Conclusion: Choosing the Right Pattern

    There is no one-size-fits-all solution for idempotency. The correct choice is a trade-off between performance, complexity, and the correctness requirements of your specific domain.

  • Use Redis-based Idempotency for high-volume, low-criticality events where sub-millisecond latency is paramount and the small risk of reprocessing due to a crash between the business logic commit and the Redis SET is acceptable.
  • Use PostgreSQL-based Idempotency when transactional integrity is required. This is the standard for most business-critical services like e-commerce, banking, and logistics. The performance overhead is acceptable for the guarantee of atomicity between the work and the idempotency state.
  • Use the Transactional Outbox Pattern when you require the highest level of correctness, completely eliminating the possibility of reprocessing due to consumer/broker communication failures after a successful transaction commit. This pattern represents the gold standard for building truly resilient, effectively-exactly-once event-driven microservices.
  • By mastering these advanced patterns, you can build systems that are not just scalable and fast, but also robust and correct in the face of the inevitable failures of a distributed world.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles