Idempotency in Event-Driven Systems: Outbox Pattern & Distributed Locks

16 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Inevitable Challenge: At-Least-Once Delivery and Its Consequences

In distributed systems, particularly those built on an event-driven architecture, the contract between a message broker (like Kafka, RabbitMQ, or Pulsar) and its consumers is fundamental. The most common and practical delivery guarantee is "at-least-once." This ensures that a message, once published, will be delivered to a consumer at least one time, even in the face of network partitions, consumer crashes, or broker restarts. While this prevents data loss, it introduces a significant challenge for the application layer: the potential for duplicate message processing.

A consumer might receive the same message multiple times if:

  • The consumer processes the message successfully but crashes before it can acknowledge (ack) the message to the broker.
  • A network issue prevents the acknowledgement from reaching the broker, causing a timeout and redelivery.
  • Broker-side rebalancing or leader election triggers a redelivery of unacknowledged messages.

Processing a CreateOrder event twice results in duplicate orders. Applying a DebitAccount event twice leads to incorrect financial balances. Naively implementing consumers without considering this reality is a recipe for catastrophic data inconsistency. The property that an operation can be applied multiple times without changing the result beyond the initial application is idempotency. It is not an optional feature; it is a mandatory requirement for building reliable distributed systems.

Why Naive Idempotency Checks Fail

A common first attempt at achieving idempotency is to store a unique identifier from each message (an idempotency key) and check for its existence before processing.

Consider this logic in a consumer:

go
// PSEUDOCODE - DO NOT USE IN PRODUCTION
func handleMessage(msg Message) {
    idempotencyKey := msg.ID

    // 1. Check if we've seen this key
    isProcessed, err := db.CheckIfProcessed(idempotencyKey)
    if err != nil { /* handle error */ }
    if isProcessed {
        ack(msg) // Already done, just acknowledge
        return
    }

    // 2. Process the message
    processBusinessLogic(msg.Data)

    // 3. Mark as processed
    db.MarkAsProcessed(idempotencyKey)

    ack(msg)
}

This seems plausible, but it harbors a critical flaw in any system with more than one concurrent consumer instance for a topic (a standard pattern for scalability). Imagine two consumer instances, C1 and C2, receive the same message due to a redelivery:

  • C1 executes db.CheckIfProcessed(key). It returns false.
  • Simultaneously, C2 executes db.CheckIfProcessed(key). It also returns false.
  • C1 proceeds to execute the business logic.
  • C2 also proceeds to execute the business logic.
    • The operation is performed twice.

    This is a classic race condition. To build a truly robust system, we need to address two distinct but related problems:

  • Atomic Event Publishing: How do we guarantee that a state change in our database and the publishing of an event corresponding to that change happen atomically? (The "dual write" problem).
  • Atomic Consumption: How do we guarantee that the "check-then-act" logic in our consumer is an atomic operation, even across multiple distributed consumer instances?
  • This article will architect a production-grade solution by combining the Transactional Outbox pattern to solve the first problem and a distributed locking mechanism to solve the second.


    Part 1: The Transactional Outbox Pattern for Atomic Publishing

    The dual-write problem occurs when a service needs to both persist data to its own database and publish a message to a broker. If you first save to the database and then the service crashes before publishing, the event is lost. If you publish first and then the database write fails, you have an event for a state change that never actually happened. Performing these two actions within a distributed transaction (2PC) is often complex and brittle, and not all message brokers support it.

    The Transactional Outbox pattern elegantly solves this by leveraging the local ACID transaction capabilities of your primary database.

    The Pattern:

  • Instead of directly publishing a message to the broker, the service appends the event to a dedicated outbox_events table within the same database.
  • This append operation occurs within the same database transaction as the business state change.
  • A separate, asynchronous process (the "message relay") tails this outbox_events table, reads the unpublished events, publishes them to the message broker, and then marks them as published.
  • Because the business state and the event record are written in a single transaction, the operation is atomic. The state change cannot be committed without the event being durably recorded, and vice-versa.

    Implementation: Schema and Service Logic

    First, let's define the schema for our outbox table in PostgreSQL.

    sql
    CREATE TABLE outbox_events (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        aggregate_id VARCHAR(255) NOT NULL, -- ID of the entity that was changed (e.g., order_id)
        aggregate_type VARCHAR(255) NOT NULL, -- Type of entity (e.g., 'order')
        event_type VARCHAR(255) NOT NULL, -- e.g., 'OrderCreated', 'OrderShipped'
        payload JSONB NOT NULL,
        created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        published_at TIMESTAMPTZ NULL -- Null until the relay process publishes it
    );
    
    -- Index for the relay process to efficiently find unpublished events
    CREATE INDEX idx_outbox_events_unpublished ON outbox_events (published_at) WHERE published_at IS NULL;

    Now, let's look at a Go service method that creates a new order and enqueues an OrderCreated event using this pattern. We'll use the sqlx library for convenience.

    go
    package orderservice
    
    import (
    	"context"
    	"encoding/json"
    	"time"
    
    	"github.com/google/uuid"
    	"github.com/jmoiron/sqlx"
    )
    
    // Order represents our business entity
    type Order struct {
    	ID        string    `db:"id"`
    	CustomerID string    `db:"customer_id"`
    	Amount    int64     `db:"amount"`
    	CreatedAt time.Time `db:"created_at"`
    }
    
    // OrderCreatedEvent is the payload for our event
    type OrderCreatedEvent struct {
    	OrderID    string `json:"order_id"`
    	CustomerID string `json:"customer_id"`
    	Amount     int64  `json:"amount"`
    }
    
    // Service handles order creation
    type Service struct {
    	DB *sqlx.DB
    }
    
    func (s *Service) CreateOrder(ctx context.Context, customerID string, amount int64) (*Order, error) {
    	// Begin a new database transaction
    	tx, err := s.DB.BeginTxx(ctx, nil)
    	if err != nil {
    		return nil, err
    	}
    	// Defer a rollback in case of error. The transaction will be committed on success.
    	defer tx.Rollback() // This is a no-op if Commit() was called
    
    	// 1. Create the Order entity
    	order := &Order{
    		ID:         uuid.New().String(),
    		CustomerID: customerID,
    		Amount:     amount,
    		CreatedAt:  time.Now(),
    	}
    
    	orderSQL := `INSERT INTO orders (id, customer_id, amount, created_at) VALUES (:id, :customer_id, :amount, :created_at)`
    	_, err = tx.NamedExecContext(ctx, orderSQL, order)
    	if err != nil {
    		return nil, err
    	}
    
    	// 2. Create the Outbox Event within the same transaction
    	eventPayload := OrderCreatedEvent{
    		OrderID:    order.ID,
    		CustomerID: order.CustomerID,
    		Amount:     order.Amount,
    	}
    	payloadBytes, err := json.Marshal(eventPayload)
    	if err != nil {
    		return nil, err
    	}
    
    	outboxSQL := `INSERT INTO outbox_events (aggregate_id, aggregate_type, event_type, payload) VALUES ($1, $2, $3, $4)`
    	_, err = tx.ExecContext(ctx, outboxSQL, order.ID, "order", "OrderCreated", payloadBytes)
    	if err != nil {
    		return nil, err
    	}
    
    	// 3. Commit the transaction
    	if err := tx.Commit(); err != nil {
    		return nil, err
    	}
    
    	return order, nil
    }

    With this code, the orders table and outbox_events table are updated atomically. We have successfully solved the dual-write problem.

    The Message Relay Process

    The relay is a separate, continuously running process. Its sole job is to poll the outbox_events table for unpublished events, send them to the message broker, and update their published_at timestamp upon successful publication.

    A simple polling implementation could look like this:

    go
    package relay
    
    import (
    	"context"
    	"log"
    	"time"
    
    	"github.com/jmoiron/sqlx"
    	// Assume a kafkaProducer client exists
    )
    
    type OutboxEvent struct {
    	ID           uuid.UUID `db:"id"`
    	EventType    string    `db:"event_type"`
    	Payload      []byte    `db:"payload"`
    	PublishedAt  *time.Time `db:"published_at"`
    }
    
    func RunRelay(db *sqlx.DB, producer kafkaProducer) {
    	for {
    		select {
    		case <-time.After(5 * time.Second): // Poll every 5 seconds
    			processOutbox(db, producer)
    		}
    	}
    }
    
    func processOutbox(db *sqlx.DB, producer kafkaProducer) {
    	// Use a transaction to ensure we lock the rows we're about to process
    	tx, err := db.BeginTxx(context.Background(), nil)
    	if err != nil {
    		log.Printf("Error starting transaction: %v", err)
    		return
    	}
    	defer tx.Rollback()
    
    	// Select and lock a batch of unpublished events
    	var events []OutboxEvent
    	query := `SELECT * FROM outbox_events WHERE published_at IS NULL ORDER BY created_at ASC LIMIT 100 FOR UPDATE SKIP LOCKED`
    	err = tx.Select(&events, query)
    	if err != nil {
    		log.Printf("Error selecting events: %v", err)
    		return
    	}
    
    	if len(events) == 0 {
    		return
    	}
    
    	for _, event := range events {
    		if err := producer.Publish("order_events", event.EventType, event.Payload); err != nil {
    			log.Printf("Failed to publish event %s: %v", event.ID, err)
    			// If publishing fails, the transaction will roll back and we'll retry later
    			return
    		}
    	}
    
    	// Mark all events in this batch as published
    	ids := make([]uuid.UUID, len(events))
    	for i, e := range events {
    		ids[i] = e.ID
    	}
    	query, args, err := sqlx.In(`UPDATE outbox_events SET published_at = NOW() WHERE id IN (?)`, ids)
    	if err != nil {
    		log.Printf("Error building update query: %v", err)
    		return
    	}
    	query = db.Rebind(query)
    	_, err = tx.Exec(query, args...)
    	if err != nil {
    		log.Printf("Error marking events as published: %v", err)
    		return
    	}
    
    	if err := tx.Commit(); err != nil {
    		log.Printf("Error committing transaction: %v", err)
    	}
    
    	log.Printf("Successfully relayed %d events", len(events))
    }

    Key Production Considerations for the Relay:

    * FOR UPDATE SKIP LOCKED: This is a critical piece of the SQL query. It allows multiple instances of the relay process to run without trying to process the same batch of events. One instance will acquire a row-level lock on the events it selects, and other instances will simply skip over those locked rows.

    * Polling vs. Change Data Capture (CDC): Polling is simple but introduces latency and puts a constant load on the database. For high-performance systems, a CDC-based approach using tools like Debezium is superior. Debezium can tail the database's transaction log (WAL in Postgres) and stream changes directly to Kafka with much lower latency and overhead.


    Part 2: Distributed Locks for Atomic Consumption

    With the Outbox pattern, we've ensured that events are published reliably and atomically. However, we still haven't solved the consumer-side race condition. If our broker redelivers a message, two consumer instances could still attempt to process the same logical operation concurrently.

    This is where a distributed lock comes in. Before a consumer begins processing a message, it must acquire a lock based on a unique idempotency key derived from the message. If it acquires the lock, it can proceed. If it fails, it means another consumer instance is already processing this message (or has already completed it), and it should not proceed.

    Redis is an excellent candidate for implementing distributed locks due to its high performance and atomic operations like SETNX (SET if Not eXists).

    Implementation: A Redis-backed Lock Manager

    Let's define a simple but effective distributed lock manager in Go.

    go
    package locking
    
    import (
    	"context"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    )
    
    // DistributedLockManager provides an interface for acquiring and releasing locks.
    type DistributedLockManager interface {
    	AcquireLock(ctx context.Context, key string, ttl time.Duration) (bool, error)
    	ReleaseLock(ctx context.comtext, key string) error
    }
    
    // redisLockManager implements the lock manager using Redis.
    type redisLockManager struct {
    	client *redis.Client
    }
    
    // NewRedisLockManager creates a new Redis-backed lock manager.
    func NewRedisLockManager(client *redis.Client) DistributedLockManager {
    	return &redisLockManager{client: client}
    }
    
    // AcquireLock attempts to acquire a lock for a given key with a TTL.
    // It uses the SET command with NX (Not Exists) and PX (millisecond TTL) options for atomicity.
    func (r *redisLockManager) AcquireLock(ctx context.Context, key string, ttl time.Duration) (bool, error) {
    	// A value of "1" is arbitrary. The existence of the key is what matters.
    	result, err := r.client.SetNX(ctx, key, 1, ttl).Result()
    	if err != nil {
    		return false, err
    	}
    	return result, nil
    }
    
    // ReleaseLock releases a lock. This uses a Lua script to ensure atomicity.
    // We only delete the key if its current value matches what we expect, preventing
    // a consumer from deleting a lock that was acquired by another process after its own expired.
    // For simplicity in this example, we'll just use a simple DEL, but a production system
    // should use a more robust script with a unique lock token.
    func (r *redisLockManager) ReleaseLock(ctx context.Context, key string) error {
    	// WARNING: Simple DEL is not perfectly safe. See edge cases section.
    	_, err := r.client.Del(ctx, key).Err()
    	return err
    }

    Part 3: The Combined Architecture in a Consumer

    Now we can combine the Outbox pattern's output with our distributed lock to create a fully idempotent consumer. We also need one more piece: a persistent record of successfully processed messages. This handles the case where a consumer processes a message, crashes before acknowledging, and then a new consumer instance receives the same message again. The lock will be available, but we need to know not to re-process the business logic.

    Let's define a processed_messages table:

    sql
    CREATE TABLE processed_messages (
        idempotency_key VARCHAR(255) PRIMARY KEY,
        processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );

    Now, let's assemble the full consumer logic.

    go
    package consumer
    
    import (
    	"context"
    	"database/sql"
    	"encoding/json"
    	"fmt"
    	"log"
    	"time"
    
    	"github.com/jmoiron/sqlx"
    	"my-project/locking"
    )
    
    // Assume message format from Kafka/RabbitMQ
    type Message struct {
    	ID      string // Unique ID from the outbox table
    	Payload json.RawMessage
    }
    
    // OrderCreatedEvent is the structure of our event payload
    type OrderCreatedEvent struct {
    	OrderID    string `json:"order_id"`
    	CustomerID string `json:"customer_id"`
    	Amount     int64  `json:"amount"`
    }
    
    type Consumer struct {
    	DB          *sqlx.DB
    	LockManager locking.DistributedLockManager
    }
    
    func (c *Consumer) HandleOrderCreated(ctx context.Context, msg Message) error {
    	// The outbox event ID is a perfect, unique idempotency key.
    	idempotencyKey := msg.ID
    	lockKey := fmt.Sprintf("lock:order_created:%s", idempotencyKey)
    	lockTTL := 30 * time.Second // The max expected processing time
    
    	// 1. Attempt to acquire the distributed lock
    	acquired, err := c.LockManager.AcquireLock(ctx, lockKey, lockTTL)
    	if err != nil {
    		log.Printf("Error acquiring lock for key %s: %v", lockKey, err)
    		return err // Return error to trigger redelivery
    	}
    
    	if !acquired {
    		log.Printf("Could not acquire lock for key %s, another consumer is processing.", lockKey)
    		// Acknowledge the message. We don't want to process it, and another consumer
    		// is already handling it. If that one fails, the broker will redeliver and
    		// another instance (maybe this one) can try again later.
    		return nil
    	}
    	// Ensure the lock is released when the function returns
    	defer c.LockManager.ReleaseLock(ctx, lockKey)
    
    	// 2. Within the lock, check if the message has already been successfully processed.
    	// This handles retries after a crash post-processing but before acknowledgement.
    	var keyExists string
    	err = c.DB.GetContext(ctx, &keyExists, "SELECT idempotency_key FROM processed_messages WHERE idempotency_key = $1", idempotencyKey)
    	if err == nil {
    		log.Printf("Message %s already processed, skipping.", idempotencyKey)
    		return nil // Acknowledge and exit
    	} else if err != sql.ErrNoRows {
    		log.Printf("Error checking processed_messages table: %v", err)
    		return err // Redeliver on DB error
    	}
    
    	// 3. Begin a transaction for the business logic and idempotency key storage
    	tx, err := c.DB.BeginTxx(ctx, nil)
    	if err != nil {
    		return err
    	}
    	defer tx.Rollback()
    
    	// 4. Execute the actual business logic
    	var event OrderCreatedEvent
    	if err := json.Unmarshal(msg.Payload, &event); err != nil {
    		log.Printf("Error unmarshalling payload for message %s: %v", idempotencyKey, err)
    		// This is likely a poison pill. Send to DLQ and acknowledge.
    		return nil
    	}
    
    	// Example: Update a customer's lifetime value
    	_, err = tx.ExecContext(ctx, 
            "UPDATE customers SET lifetime_value = lifetime_value + $1 WHERE id = $2", 
            event.Amount, event.CustomerID)
    	if err != nil {
    		log.Printf("Business logic failed for message %s: %v", idempotencyKey, err)
    		return err // Redeliver
    	}
    
    	// 5. Atomically mark the message as processed
    	_, err = tx.ExecContext(ctx, "INSERT INTO processed_messages (idempotency_key) VALUES ($1)", idempotencyKey)
    	if err != nil {
    		log.Printf("Failed to mark message %s as processed: %v", idempotencyKey, err)
    		return err // Redeliver
    	}
    
    	// 6. Commit the transaction
    	if err := tx.Commit(); err != nil {
    		return err
    	}
    
    	log.Printf("Successfully processed message %s", idempotencyKey)
    	return nil // Acknowledge the message
    }

    This architecture is robust. The distributed lock prevents concurrent execution, and the processed_messages table, updated transactionally with the business logic, prevents reprocessing after failures.


    Part 4: Advanced Edge Cases and Performance Considerations

    While the combined pattern is powerful, a production deployment requires careful consideration of several edge cases.

    Edge Case 1: Lock TTL vs. Processing Time

    What happens if the business logic takes longer than the lock's TTL? Consumer C1 acquires a 30-second lock, but its processing takes 40 seconds. At the 30-second mark, the lock expires in Redis. Consumer C2 now successfully acquires the lock for the same message and starts processing. We now have two consumers executing the same logic concurrently.

    Solutions:

  • Heartbeating/Lock Extension: The consumer responsible for the lock can periodically send a command to Redis to extend the lock's TTL, as long as it's still actively processing. This requires a background goroutine in the consumer.
  • Realistic TTLs: Set a generous TTL that is well above your p99 processing time, and have strong monitoring and alerting for jobs that exceed this threshold.
  • Edge Case 2: Unsafe Lock Release

    Our simple ReleaseLock uses DEL. Consider this scenario:

  • C1 acquires a lock with a 30s TTL.
  • C1 experiences a long GC pause and its processing takes >30s.
    • The lock expires.
  • C2 acquires the lock for the same key.
  • C1 finally finishes its work and calls ReleaseLock, deleting the key that now belongs to C2.
  • C3 can now acquire the lock while C2 is still working.
  • Solution:

    The lock must be owned. When acquiring a lock, set the value to a unique, random token. When releasing, use a Lua script to atomically check if the key's value matches the token before deleting it.

    lua
    -- Lua script for safe lock release
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end

    Your AcquireLock would return this unique token, and ReleaseLock would execute this script, passing the token as an argument.

    Performance and Scalability

    * Outbox Table Growth: The outbox_events table will grow indefinitely. Implement a cleanup job that archives or deletes old, published events. For very high-throughput systems, partitioning the table (e.g., by month) using a tool like pg_partman is essential to keep indexes small and queries fast.

    * processed_messages Table Growth: This table also grows forever. It can be partitioned, and its primary key index must be kept hot in memory.

    * Lock Contention: If many events for the same aggregate ID arrive in a short burst, they will contend for the same lock, serializing their processing. This is often desired behavior to ensure ordering, but be aware of the performance implications. Ensure your lock key granularity is as fine as possible.

    * Redis as a Bottleneck: A single Redis instance can handle hundreds of thousands of ops/sec, but it can still become a bottleneck or a single point of failure. Use Redis Sentinel for high availability or Redis Cluster for sharding the keyspace to scale writes.

    Conclusion: The Price of Reliability

    Implementing a robust idempotency layer in an event-driven architecture is a non-trivial engineering task. It requires a multi-faceted approach that addresses atomicity at both the producer and consumer sides. The Transactional Outbox pattern provides a bulletproof solution to the dual-write problem, guaranteeing that state changes and their corresponding events are committed as a single unit. On the consumer side, a distributed lock prevents race conditions from concurrent processing, while a persistent log of processed messages handles recovery from failures.

    While simpler solutions exist, they often contain subtle race conditions or failure modes that only manifest under production load or during failure scenarios. The combined pattern detailed here, while complex, provides the level of correctness and data integrity required for mission-critical, high-throughput distributed systems. For senior engineers, mastering these patterns is a key step from simply building services to architecting truly resilient and reliable systems.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles