Kafka Consumer Idempotency: The Redis and Outbox Pattern
The Idempotency Imperative in Asynchronous Systems
In distributed systems, Kafka's at-least-once delivery guarantee is a foundational contract. It ensures message durability but introduces a significant challenge for consumers: the potential for duplicate message processing. A network partition, a consumer crash post-processing but pre-commit, or a consumer group rebalance can all trigger redelivery of the same message. For many applications—payment processing, order fulfillment, inventory management—processing a duplicate message can lead to catastrophic data corruption.
The common goal is to achieve effectively-once processing semantics at the application layer. This means that while the message may be delivered more than once, our application logic ensures it is processed and its side effects are applied exactly once.
A naive approach might involve using a unique constraint in your primary database on a transaction ID. However, this pattern quickly breaks down when a single message consumption triggers multiple, non-atomic operations: updating a database row, making an external API call, and producing a new event. If a failure occurs between these steps, the system is left in an inconsistent state. A truly robust solution must guarantee both idempotency of the initial consumption and atomicity of its resulting side effects.
This article details a battle-tested, production-grade pattern that combines the low-latency capabilities of Redis for idempotency checks with the transactional consistency of the Outbox pattern. We will architect a consumer that can withstand crashes and retries at any point in its lifecycle while guaranteeing data integrity.
Core Pattern: The Stateful Idempotency Key with Redis
A simple idempotency check might use Redis's SETNX (SET if Not eXists) command. The consumer would extract a unique ID from the incoming message (e.g., event_id), attempt to SETNX this key in Redis, and only proceed if the command returns 1 (indicating the key was newly set).
However, this simple approach has a critical flaw: the crash-recovery race condition. Consider this sequence:
M1 with key K1.SETNX K1 1 in Redis.M1 again.SETNX K1 1, which now returns 0 because the key exists.M1 was fully processed, skips it, and commits the offset.The business logic was never completed, and the message is now lost forever. To solve this, we must evolve our idempotency record from a simple flag to a state machine.
A Multi-Stage Idempotency Record
We'll store a JSON object or a Redis Hash for each idempotency key, tracking the processing state. The states are:
* PROCESSING: The message is currently being handled. This acts as a lock.
* COMPLETED: The message was successfully and completely processed.
* FAILED: The message processing failed due to a transient error and can be retried.
Here's the refined consumer logic:
K from the message.K in Redis. * If COMPLETED, the message is a duplicate of a successfully processed one. Acknowledge and skip.
* If PROCESSING, another consumer might be working on it. This could indicate a long-running process or a zombie consumer. We can choose to wait, retry, or raise an alert after a timeout.
* If FAILED or non-existent, proceed to process.
K to PROCESSING with a reasonable TTL (e.g., 5 minutes). This TTL prevents permanent locks if a consumer dies without cleaning up.- Execute business logic.
K to COMPLETED with a longer TTL (e.g., 24 hours) to cover the redelivery window.FAILED or simply delete the key to allow a full retry on redelivery.- Commit the Kafka offset.
Go Implementation: Stateful Idempotency Consumer
Let's implement this logic in Go using the go-redis library. This example assumes you have a Kafka consumer loop set up.
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/segmentio/kafka-go"
)
// IdempotencyRecord defines the state stored in Redis.
type IdempotencyRecord struct {
Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
}
const (
StatusProcessing = "PROCESSING"
StatusCompleted = "COMPLETED"
)
const (
processingTTL = 5 * time.Minute
completedTTL = 24 * time.Hour
)
// IdempotencyService manages the idempotency checks against Redis.
type IdempotencyService struct {
redisClient *redis.Client
}
func NewIdempotencyService(client *redis.Client) *IdempotencyService {
return &IdempotencyService{redisClient: client}
}
// CheckAndSetProcessing checks the status and sets it to PROCESSING if permissible.
func (s *IdempotencyService) CheckAndSetProcessing(ctx context.Context, key string) (bool, error) {
val, err := s.redisClient.Get(ctx, key).Result()
if err != nil && err != redis.Nil {
return false, fmt.Errorf("failed to get key from redis: %w", err)
}
if err == nil {
var record IdempotencyRecord
if jsonErr := json.Unmarshal([]byte(val), &record); jsonErr != nil {
// Corrupted data, treat as processable but log a warning
fmt.Printf("WARN: Corrupted idempotency record for key %s: %v\n", key, jsonErr)
} else {
if record.Status == StatusCompleted {
fmt.Printf("INFO: Duplicate message detected, key %s already completed.\n", key)
return false, nil // Do not process
}
if record.Status == StatusProcessing {
// Check timestamp to see if it's a stale lock
if time.Now().Unix()-record.Timestamp < int64(processingTTL.Seconds()) {
fmt.Printf("WARN: Message with key %s is already processing.\n", key)
return false, nil // Do not process, maybe another consumer is busy
}
fmt.Printf("INFO: Stale processing lock for key %s. Re-processing.\n", key)
}
}
}
// Set to PROCESSING
newRecord := IdempotencyRecord{
Status: StatusProcessing,
Timestamp: time.Now().Unix(),
}
recordJSON, _ := json.Marshal(newRecord)
// Using SET with NX option to be safe, though our logic mostly prevents races.
// This is not perfectly atomic with the GET, for true atomicity a LUA script is best.
// However, for this pattern, the small race window is often acceptable.
_, err = s.redisClient.Set(ctx, key, recordJSON, processingTTL).Result()
if err != nil {
return false, fmt.Errorf("failed to set processing status in redis: %w", err)
}
return true, nil // Ok to process
}
// SetCompleted marks a key as successfully processed.
func (s *IdempotencyService) SetCompleted(ctx context.Context, key string) error {
completedRecord := IdempotencyRecord{
Status: StatusCompleted,
Timestamp: time.Now().Unix(),
}
recordJSON, _ := json.Marshal(completedRecord)
err := s.redisClient.Set(ctx, key, recordJSON, completedTTL).Err()
if err != nil {
return fmt.Errorf("failed to set completed status: %w", err)
}
return nil
}
// processMessage is a placeholder for your main consumer logic
func processMessage(ctx context.Context, msg kafka.Message, idemSvc *IdempotencyService) error {
idempotencyKey := string(msg.Key) // Assuming event ID is in the Kafka message key
canProcess, err := idemSvc.CheckAndSetProcessing(ctx, idempotencyKey)
if err != nil {
// Error communicating with Redis, bubble up to trigger a retry
return fmt.Errorf("idempotency check failed: %w", err)
}
if !canProcess {
// Message is a duplicate or is being processed elsewhere
return nil // Acknowledge without processing
}
// --- BEGIN BUSINESS LOGIC ---
fmt.Printf("Processing message: %s\n", idempotencyKey)
// Simulate work
time.Sleep(2 * time.Second)
// --- END BUSINESS LOGIC ---
// Mark as completed
if err := idemSvc.SetCompleted(ctx, idempotencyKey); err != nil {
// This is a critical failure. If this fails, on redelivery we will re-process.
// The system must be designed to handle this possibility, potentially with out-of-band cleanup.
return fmt.Errorf("failed to mark message as completed: %w", err)
}
fmt.Printf("Successfully processed and marked as completed: %s\n", idempotencyKey)
return nil
}
Note on Atomicity: The GET followed by a SET in CheckAndSetProcessing is not atomic. A small race condition exists where two consumers could read a non-existent key simultaneously and both proceed to set it to PROCESSING. The second write will overwrite the first, but both consumers will start processing. While often acceptable if the business logic itself has further guards, true atomicity requires a Lua script executed on the Redis server.
-- LUA Script for atomic check-and-set
local key = KEYS[1]
local processing_ttl_seconds = ARGV[1]
local current_ts = ARGV[2]
local processing_payload = ARGV[3] -- e.g., '{"status":"PROCESSING", ...}'
local completed_status = ARGV[4] -- e.g., 'COMPLETED'
local value = redis.call('GET', key)
if value then
-- Using cjson library which is standard in Redis
local record = cjson.decode(value)
if record.status == completed_status then
return 'COMPLETED'
else
-- Could add more logic for stale PROCESSING check here
return 'LOCKED'
end
end
-- Key does not exist, set to PROCESSING
redis.call('SET', key, processing_payload, 'EX', processing_ttl_seconds)
return 'OK'
The Atomicity Challenge: The Transactional Outbox Pattern
The Redis idempotency layer solves the duplicate processing problem for the consumer's logic. But what if that logic needs to both update a local database and produce a new, derivative event? This is the classic dual-write problem.
Example:
A PaymentProcessed event is consumed. The consumer must:
orders table in a PostgreSQL database to status = 'PAID'. OrderReadyForShipment event to another Kafka topic.If the process crashes after the database commit but before the Kafka produce call returns successfully, the order is marked as paid, but the shipping department is never notified. The system is now inconsistent.
The Transactional Outbox pattern solves this by leveraging the atomicity of the local database transaction.
The Pattern:
- The consumer begins a database transaction.
- It performs all its required database updates (e.g., updating the order status).
outbox table within the same database and same transaction.- It commits the database transaction.
Now, the state change and the intent to send the event are committed atomically. If the commit succeeds, both are saved; if it fails, both are rolled back. The dual-write problem is eliminated.
Outbox Table Schema
A typical outbox table in PostgreSQL might look like this:
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published BOOLEAN NOT NULL DEFAULT FALSE,
published_at TIMESTAMPTZ
);
-- Critical index for the poller/CDC connector
CREATE INDEX idx_outbox_unpublished ON outbox (created_at) WHERE published = FALSE;
The Outbox Processor: Debezium vs. Polling
With the event safely in the outbox, a separate process is needed to relay it to Kafka. There are two primary strategies.
Approach A: Change Data Capture (CDC) with Debezium
This is the most robust and performant approach. Debezium is a platform for CDC that tails the database's transaction log (e.g., PostgreSQL's Write-Ahead Log or WAL).
How it works:
debezium-connector-postgres) via Kafka Connect.outbox table.outbox is committed, Debezium reads this event from the WAL.- It transforms the row data into a structured event and publishes it to a specified Kafka topic.
Pros:
* Low Latency: Reads directly from the transaction log, making it near real-time.
* High Throughput: Extremely efficient and doesn't load the primary database with queries.
* Guaranteed Delivery: Debezium is built on Kafka Connect, providing fault tolerance and delivery guarantees.
* Decoupling: The application logic is completely unaware of the publishing mechanism.
Cons:
* Operational Complexity: Requires running and maintaining a Kafka Connect cluster, which is non-trivial.
* Configuration: Setting up connectors requires careful configuration of schemas, transforms, and error handling.
Approach B: Application-Level Polling
This is a simpler, infrastructure-light alternative. A background process within your application or a separate small service periodically queries the outbox table for unpublished events.
How it works:
- A poller runs on a fixed interval (e.g., every 100ms).
SELECT * FROM outbox WHERE published = FALSE ORDER BY created_at LIMIT 100;- For each retrieved row, it produces the event to Kafka.
UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = ?;Pros:
* Simplicity: No extra infrastructure beyond the application itself.
* Easy to Implement: The logic is straightforward application code.
Cons:
* Higher Latency: The polling interval introduces a delay.
* Database Load: Constant polling adds read/write load to your primary database.
* Scalability Concerns: Can be difficult to scale the poller horizontally without introducing race conditions where multiple instances try to process the same outbox items. This requires locking mechanisms (e.g., SELECT ... FOR UPDATE SKIP LOCKED).
Code Example: A Simple Go Outbox Poller
package main
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/lib/pq"
"github.com/segmentio/kafka-go"
)
type OutboxEvent struct {
ID string
AggregateID string
EventType string
Payload []byte // JSONB as raw bytes
}
func PollAndPublish(ctx context.Context, db *sql.DB, writer *kafka.Writer) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
events, err := fetchUnpublishedEvents(ctx, db, 100)
if err != nil {
fmt.Printf("ERROR: Failed to fetch from outbox: %v\n", err)
continue
}
for _, event := range events {
msg := kafka.Message{
Key: []byte(event.AggregateID),
Value: event.Payload,
Headers: []kafka.Header{{Key: "eventType", Value: []byte(event.EventType)}},
}
if err := writer.WriteMessages(ctx, msg); err != nil {
fmt.Printf("ERROR: Failed to publish outbox event %s to Kafka: %v\n", event.ID, err)
// Break and retry the whole batch later
break
}
if err := markEventAsPublished(ctx, db, event.ID); err != nil {
fmt.Printf("CRITICAL: Published event %s but failed to mark as published: %v\n", event.ID, err)
// This can lead to duplicate event publishing. Needs alerting.
break
}
}
}
}
}
func fetchUnpublishedEvents(ctx context.Context, db *sql.DB, limit int) ([]OutboxEvent, error) {
// In a real multi-instance scenario, you MUST use row-level locking
// SELECT ... FOR UPDATE SKIP LOCKED is the standard for this pattern
rows, err := db.QueryContext(ctx,
`SELECT id, aggregate_id, event_type, payload FROM outbox WHERE published = FALSE ORDER BY created_at LIMIT $1 FOR UPDATE SKIP LOCKED`,
limit)
if err != nil {
return nil, err
}
defer rows.Close()
var events []OutboxEvent
for rows.Next() {
var e OutboxEvent
if err := rows.Scan(&e.ID, &e.AggregateID, &e.EventType, &e.Payload); err != nil {
return nil, err
}
events = append(events, e)
}
return events, nil
}
func markEventAsPublished(ctx context.Context, db *sql.DB, id string) error {
_, err := db.ExecContext(ctx, "UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = $1", id)
return err
}
Tying It All Together: The End-to-End Flow
Let's integrate the Redis idempotency check with the Transactional Outbox pattern for a fully resilient consumer.
sequenceDiagram
participant C as Kafka Consumer
participant R as Redis
participant DB as PostgreSQL
participant P as Outbox Processor
participant K as Kafka
K->>C: Delivers Message M1 (EventID: E1)
C->>R: GET idempotency key E1
R-->>C: Key not found
C->>R: SET E1 {status: "PROCESSING"} TTL 5m
R-->>C: OK
C->>DB: BEGIN TRANSACTION
C->>DB: UPDATE orders SET status = 'PAID' WHERE ...
C->>DB: INSERT INTO outbox (aggregate_id, event_type, payload) VALUES (...)
C->>DB: COMMIT
alt Commit Successful
DB-->>C: COMMIT OK
C->>R: SET E1 {status: "COMPLETED"} TTL 24h
R-->>C: OK
C->>K: Commit Kafka Offset
P->>DB: SELECT * FROM outbox WHERE published=FALSE
DB-->>P: Returns Outbox Row for M1's derivative event
P->>K: Produce new Message M2
K-->>P: Ack M2
P->>DB: UPDATE outbox SET published=TRUE WHERE id=...
else Commit Fails
DB-->>C: ROLLBACK / Error
C->>R: DEL E1 (or let it expire)
C-xK: Do NOT commit offset (message will be redelivered)
end
This combined pattern ensures:
Edge Cases and Production Considerations
* Poison Pill Messages: A message that consistently causes a failure in the business logic (e.g., due to malformed data) will be redelivered indefinitely. Your consumer must have a retry mechanism with a backoff policy and, after a certain number of failures, move the message to a Dead-Letter Queue (DLQ). The idempotency record in Redis should be updated to FAILED and perhaps include a retry count.
* Redis Unavailability: If Redis is down, the idempotency check fails. The consumer should fail open (risk duplicate processing with alerting) or fail closed (stop processing altogether), depending on business requirements. A fail-closed approach is safer. Implement circuit breakers around Redis calls.
Outbox Poller Failure: What if the poller publishes to Kafka but crashes before marking the outbox event as published? On restart, it will republish the same event. The consumers of that* topic must also be idempotent, demonstrating that idempotency is a required property of services throughout an event-driven ecosystem.
* Database Contention: The SELECT ... FOR UPDATE SKIP LOCKED pattern is crucial for scaling the poller, but it can still cause contention. Ensure the index on (published, created_at) is in place. For very high-throughput systems, consider partitioning the outbox table.
* Cleanup: Both the Redis keys and the outbox table will grow indefinitely. A periodic cleanup job is necessary. For Redis, the TTL handles this automatically. For the outbox table, a background process should archive or delete rows where published = TRUE and published_at is older than a specified retention period (e.g., 7 days).
Conclusion
Achieving 'effectively-once' semantics in a distributed system like Kafka is not a feature of the broker, but a responsibility of the application. By moving beyond simplistic checks and implementing a robust, two-part strategy—stateful idempotency records in Redis for fast duplicate detection and the Transactional Outbox pattern for atomic state changes—we can build highly resilient and consistent services. This architecture, while complex, provides the necessary guarantees for mission-critical systems where data integrity is paramount. It correctly handles process crashes, network failures, and message redeliveries, ensuring that each event leaves its mark on the system exactly once.