Idempotency Patterns for Kafka-based Event-Driven Microservices
The Inevitability of Duplicates in Asynchronous Systems
In any mature, distributed, event-driven architecture, the problem of message duplication is not a matter of if, but when. While platforms like Apache Kafka provide powerful delivery guarantees, the default and most common configuration is at-least-once delivery. This guarantee is a pragmatic trade-off, ensuring no data is lost during broker or consumer failures, at the cost of potentially redelivering messages.
Why not exactly-once? While Kafka now offers exactly-once semantics (EOS) through its transactional APIs, achieving it end-to-end (from producer to consumer processing to final output) requires a holistic and often complex implementation. More critically, the most common point of failure that introduces duplicates is within the consumer's own logic. Consider this canonical failure scenario:
process_payment with transaction_id: 123).- It successfully processes the business logic (the payment is debited in the database).
- It prepares to commit the offset to Kafka to signal completion.
Upon restart, the new consumer instance will fetch messages from the last committed offset, re-receiving the process_payment message for transaction_id: 123. Without a robust idempotency strategy, it will process the payment a second time, resulting in a double charge—a catastrophic business failure.
This is why application-level idempotency is non-negotiable for any critical operation in an event-driven system. An idempotent operation is one that can be applied multiple times without changing the result beyond the initial application. Our goal as senior engineers is to build consumers that are inherently idempotent, transforming at-least-once delivery into effectively-exactly-once processing.
This article dissects two production-grade patterns for achieving this, with complete Go implementations: a high-throughput Redis-based approach and a transactionally-consistent PostgreSQL-based approach, culminating in the Transactional Outbox pattern for ultimate durability.
Pattern 1: High-Throughput Idempotency with Redis
For systems where processing latency is critical and a minimal risk of data loss on the idempotency store is acceptable (e.g., view counters, non-financial aggregations), Redis is an exceptional choice. Its atomic, single-threaded operations like SETNX (Set If Not Exists) provide a highly performant distributed lock and state-tracking mechanism.
The Core Logic
The pattern is straightforward:
order_id, payment_id).SETNX.- If the key is successfully claimed, the consumer proceeds with the business logic.
- Upon completion, the consumer updates the key in Redis to store the result and sets a Time-To-Live (TTL) to prevent the store from growing indefinitely.
- If the key claim fails, it means another consumer is processing it or has already completed it. The consumer then reads the key's value to determine the status and act accordingly.
Go Implementation with Redis
Let's build a consumer that processes a simple OrderCreated event. We'll use the redis/go-redis and confluentinc/confluent-kafka-go libraries.
Data Structures and Constants:
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
const (
IdempotencyKeyHeader = "idempotency-key"
RedisKeyPrefix = "idempotency:"
KeyTTL = 24 * time.Hour
)
type OrderEvent struct {
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
}
type ProcessingStatus string
const (
StatusPending ProcessingStatus = "PENDING"
StatusCompleted ProcessingStatus = "COMPLETED"
)
type StoredResult struct {
Status ProcessingStatus `json:"status"`
Response string `json:"response"`
ProcessingNode string `json:"processing_node"`
}
The Consumer Logic:
Here's the core of the consumer. Pay close attention to the handling of the SETNX result and the multi-step state management within Redis.
// Assume rdb is an initialized *redis.Client
// Assume consumer is an initialized *kafka.Consumer
func handleMessageWithRedis(ctx context.Context, rdb *redis.Client, msg *kafka.Message) error {
var idempotencyKey string
for _, h := range msg.Headers {
if h.Key == IdempotencyKeyHeader {
idempotencyKey = string(h.Value)
break
}
}
if idempotencyKey == "" {
fmt.Println("ERROR: Message missing idempotency key")
return nil // Acknowledge and move on, or move to DLQ
}
redisKey := RedisKeyPrefix + idempotencyKey
// 1. Attempt to claim the key
pendingState := StoredResult{Status: StatusPending, ProcessingNode: "node-1"} // node-1 is a placeholder
pendingJSON, _ := json.Marshal(pendingState)
claimed, err := rdb.SetNX(ctx, redisKey, pendingJSON, 1*time.Minute).Result() // Short TTL for pending state
if err != nil {
return fmt.Errorf("redis SETNX failed: %w", err)
}
if claimed {
// 2. We got the lock! Process the message.
fmt.Printf("Claimed key %s. Processing message...\n", idempotencyKey)
var event OrderEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
// Handle poison pill message
// Potentially delete the key to allow for retries if it's a transient error
rdb.Del(ctx, redisKey)
return fmt.Errorf("failed to unmarshal event: %w", err)
}
// --- BUSINESS LOGIC ---
// (e.g., save to DB, call another service)
processingResult := fmt.Sprintf("Processed order %s for amount %.2f", event.OrderID, event.Amount)
time.Sleep(2 * time.Second) // Simulate work
// --- END BUSINESS LOGIC ---
// 3. Store the final result and set the final TTL
finalState := StoredResult{Status: StatusCompleted, Response: processingResult}
finalJSON, _ := json.Marshal(finalState)
if err := rdb.Set(ctx, redisKey, finalJSON, KeyTTL).Err(); err != nil {
// This is a critical failure. The work is done but we can't mark it as such.
// This could lead to a retry that re-processes. Requires manual intervention or a more robust pattern.
return fmt.Errorf("CRITICAL: failed to set final state in redis: %w", err)
}
fmt.Printf("Finished processing for key %s.\n", idempotencyKey)
return nil
} else {
// 4. Key already exists. Check its status.
fmt.Printf("Key %s already exists. Checking status...\n", idempotencyKey)
for i := 0; i < 5; i++ { // Retry loop for pending status
val, err := rdb.Get(ctx, redisKey).Result()
if err == redis.Nil {
// Key expired or was deleted between SETNX and GET. Retry the whole process.
fmt.Println("Key disappeared. Retrying message handling.")
return handleMessageWithRedis(ctx, rdb, msg) // Recursive call, be careful with stack depth
}
if err != nil {
return fmt.Errorf("redis GET failed: %w", err)
}
var stored StoredResult
if err := json.Unmarshal([]byte(val), &stored); err != nil {
return fmt.Errorf("failed to unmarshal stored result: %w", err)
}
if stored.Status == StatusCompleted {
fmt.Printf("Message with key %s already processed. Response: '%s'. Skipping.\n", idempotencyKey, stored.Response)
return nil // Success, already done.
}
if stored.Status == StatusPending {
fmt.Printf("Key %s is pending by another process. Waiting... (attempt %d)\n", idempotencyKey, i+1)
time.Sleep(500 * time.Millisecond)
continue
}
}
return fmt.Errorf("key %s stuck in pending state", idempotencyKey)
}
}
Performance and Edge Cases
* Performance: This approach is incredibly fast. A SETNX operation in Redis typically completes in under a millisecond. It's ideal for high-throughput topics where tens of thousands of messages per second are being processed across a fleet of consumers.
* Race Conditions: The PENDING state with a short TTL helps mitigate race conditions where a consumer claims a key but crashes before completing the work. The short TTL ensures the lock is eventually released. The retry loop in the else block handles the case where one consumer encounters a key that is actively being processed by another.
* Critical Failure Point: The most significant weakness is the non-atomic nature of Business Logic -> Redis SET. If your business logic commits a database transaction but the consumer crashes before the final rdb.Set() call, the idempotency key will be left in a PENDING state until its short TTL expires. When another consumer picks it up, it will re-process the already-committed work. This is often an acceptable risk for non-critical tasks but a deal-breaker for financial transactions.
* Redis Failure: If the Redis cluster goes down, your entire processing pipeline halts. If it suffers data loss (e.g., a failed failover where some writes are lost), your idempotency guarantees are voided for the lost keys.
Pattern 2: Transactional Integrity with PostgreSQL
For systems that demand absolute correctness and durability, such as financial ledgers, order processing, or inventory management, we must tie the idempotency check to the same atomic transaction as the business logic. A relational database like PostgreSQL is the perfect tool for this.
The Core Logic
This pattern leverages the ACID properties of a relational database.
idempotency_keys table in your database.- For each incoming message, the consumer starts a database transaction.
INSERT the message's idempotency key into the table.INSERT succeeds, it proceeds with the business logic (e.g., UPDATEing other tables) within the same transaction.INSERT fails due to a unique constraint violation, the key already exists. The consumer then SELECTs the existing row to check its status.SELECT must use a pessimistic lock (FOR UPDATE) to prevent race conditions.- Finally, the entire transaction is committed. Atomicity ensures that the business logic and the idempotency key record are saved together, or not at all.
Go Implementation with PostgreSQL
Let's refactor our consumer to use pgx with PostgreSQL.
Database Schema:
CREATE TABLE idempotency_keys (
key VARCHAR(255) PRIMARY KEY,
status VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Store the response so we can return it on duplicate calls
response_payload JSONB,
-- To prevent keys from being stuck in PENDING forever
last_updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_idempotency_keys_created_at ON idempotency_keys(created_at);
The Consumer Logic:
This implementation is more complex, but provides far stronger guarantees.
// Assume db is an initialized *pgxpool.Pool
func handleMessageWithPostgres(ctx context.Context, db *pgxpool.Pool, msg *kafka.Message) error {
var idempotencyKey string
// ... (extract idempotencyKey as before)
tx, err := db.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx) // Rollback is a no-op if tx is committed
// 1. Try to insert the key. ON CONFLICT DO NOTHING is key.
// This is an atomic check-and-set.
insertCmd, err := tx.Exec(ctx,
`INSERT INTO idempotency_keys (key, status) VALUES ($1, 'PENDING') ON CONFLICT (key) DO NOTHING`,
idempotencyKey,
)
if err != nil {
return fmt.Errorf("failed to insert idempotency key: %w", err)
}
if insertCmd.RowsAffected() > 0 {
// 2. We are the first. The key is now ours within this transaction.
fmt.Printf("Claimed key %s. Processing message...\n", idempotencyKey)
var event OrderEvent
// ... (unmarshal event as before)
// --- BUSINESS LOGIC (within the same transaction) ---
processingResult := fmt.Sprintf("Processed order %s for amount %.2f", event.OrderID, event.Amount)
// Example: tx.Exec(ctx, "UPDATE orders SET status='PROCESSED' WHERE id=$1", event.OrderID)
time.Sleep(2 * time.Second) // Simulate work
// --- END BUSINESS LOGIC ---
resultPayload, _ := json.Marshal(map[string]string{"result": processingResult})
// 3. Update the key to COMPLETED with the result
_, err = tx.Exec(ctx,
`UPDATE idempotency_keys SET status='COMPLETED', response_payload=$1, last_updated_at=NOW() WHERE key=$2`,
resultPayload,
idempotencyKey,
)
if err != nil {
return fmt.Errorf("failed to update idempotency key to completed: %w", err)
}
} else {
// 4. Key already exists. We need to check its status.
// Use SELECT ... FOR UPDATE to acquire a row-level lock.
// This blocks other concurrent consumers trying to process the same key, preventing race conditions.
fmt.Printf("Key %s already exists. Locking row and checking status...\n", idempotencyKey)
var status string
var responsePayload []byte
err = tx.QueryRow(ctx,
`SELECT status, response_payload FROM idempotency_keys WHERE key=$1 FOR UPDATE`,
idempotencyKey,
).Scan(&status, &responsePayload)
if err != nil {
return fmt.Errorf("failed to select and lock idempotency key: %w", err)
}
if status == "COMPLETED" {
fmt.Printf("Message with key %s already processed. Skipping. Response: %s\n", idempotencyKey, string(responsePayload))
// We can commit the transaction here, as it was read-only and we're done.
return tx.Commit(ctx)
}
if status == "PENDING" {
// Another consumer crashed. Or the transaction is just very long.
// Here you need a business rule. E.g., if last_updated_at > 10 mins ago, take over.
// For simplicity, we will just abort.
return fmt.Errorf("key %s is already pending by another transaction", idempotencyKey)
}
}
// 5. Commit the entire transaction
fmt.Printf("Committing transaction for key %s\n", idempotencyKey)
return tx.Commit(ctx)
}
Durability and Trade-offs
* Atomicity: This is the killer feature. The business logic and the idempotency state change are committed in a single atomic unit. The failure mode from the Redis example (work done, state not saved) is impossible.
* Performance: The cost is latency. Database transactions, especially those involving row-level locks (FOR UPDATE), are orders of magnitude slower than a Redis SETNX. This might not be suitable for topics requiring extremely low latency, but it's perfect for most transactional business processes.
* Database Load: This pattern increases the write load on your database. The idempotency_keys table will see at least one INSERT or UPDATE for every single message processed. Ensure your database is provisioned to handle this load and that you have a cleanup strategy for old keys.
* Deadlocks: SELECT ... FOR UPDATE can introduce the risk of deadlocks if transactions acquire locks in different orders. Ensure your access patterns are consistent. In this specific idempotency pattern, since we are always locking a single key, deadlocks are unlikely but not impossible in more complex transaction scenarios.
Advanced Pattern: The Transactional Outbox for Flawless Atomicity
We've solved the atomicity of business logic and idempotency state. But we still haven't solved the original failure scenario perfectly: what if our transaction commits successfully, but the consumer crashes before committing the Kafka offset?
This is where the Transactional Outbox pattern comes in. It extends the database-backed approach to achieve true end-to-end atomicity.
The Core Logic
The pattern is an enhancement of the PostgreSQL implementation:
processed_kafka_offsets table in the same database.INSERT or UPDATE the Kafka offset (topic, partition, offset) into this new table.Because this is all in one transaction, we can now guarantee this invariant: If the business logic is committed, the Kafka offset for that message is also durably stored in our database.
How does this help? When a consumer starts up, its first action is not to ask Kafka for the last committed offset. Instead, it queries its own processed_kafka_offsets table to find the latest offset it has successfully processed in a transaction. It then uses Kafka's Seek() functionality to begin consuming from that precise point, completely ignoring whatever stale offset Kafka might have stored.
Implementation Sketch
Additional Schema:
CREATE TABLE processed_kafka_offsets (
topic VARCHAR(255) NOT NULL,
partition INT NOT NULL,
last_processed_offset BIGINT NOT NULL,
PRIMARY KEY (topic, partition)
);
Modified Consumer Logic:
// Inside handleMessageWithPostgres, within the transaction...
// ... after successfully processing business logic and updating idempotency key ...
// 6. Store the Kafka offset in the same transaction
_, err = tx.Exec(ctx,
`INSERT INTO processed_kafka_offsets (topic, partition, last_processed_offset)
VALUES ($1, $2, $3)
ON CONFLICT (topic, partition) DO UPDATE
SET last_processed_offset = EXCLUDED.last_processed_offset`,
msg.TopicPartition.Topic,
msg.TopicPartition.Partition,
msg.TopicPartition.Offset,
)
if err != nil {
return fmt.Errorf("failed to save kafka offset: %w", err)
}
// 7. Commit transaction
return tx.Commit(ctx)
Consumer Startup Logic:
// Before the main consumer loop starts...
func seekToLastProcessedOffset(ctx context.Context, db *pgxpool.Pool, consumer *kafka.Consumer, partitions []kafka.TopicPartition) {
for _, p := range partitions {
var lastOffset int64
err := db.QueryRow(ctx,
`SELECT last_processed_offset FROM processed_kafka_offsets WHERE topic=$1 AND partition=$2`,
*p.Topic,
p.Partition,
).Scan(&lastOffset)
if err == nil {
// We have a stored offset, seek to the *next* message
seekPartition := kafka.TopicPartition{Topic: p.Topic, Partition: p.Partition, Offset: kafka.Offset(lastOffset + 1)}
fmt.Printf("Seeking partition %v to stored offset %d\n", p, lastOffset+1)
consumer.Seek(seekPartition, 0)
} else {
// No stored offset, let the consumer use its default committed offset
fmt.Printf("No stored offset for partition %v, using default\n", p)
}
}
}
This closes the final gap. The system is now resilient to consumer crashes at any point. A committed database transaction becomes the single source of truth for both the business state and the message processing progress.
Conclusion: Choosing the Right Pattern
There is no one-size-fits-all solution for idempotency. The correct choice is a trade-off between performance, complexity, and the correctness requirements of your specific domain.
SET is acceptable.effectively-exactly-once event-driven microservices.By mastering these advanced patterns, you can build systems that are not just scalable and fast, but also robust and correct in the face of the inevitable failures of a distributed world.