Idempotency Key Patterns for Resilient Event-Driven APIs
The Idempotency Imperative in Modern Systems
In any distributed system, the promise of "exactly-once" processing is a siren's call—alluring but notoriously difficult to achieve. Network partitions, client-side retry logic, and the at-least-once delivery semantics of message brokers like Kafka or RabbitMQ guarantee that your endpoints and consumers will inevitably process duplicate requests. For any operation with side effects—charging a credit card, updating inventory, sending a notification—a duplicate request can be catastrophic.
Senior engineers understand that simply assigning a unique ID to a request is insufficient. True resilience requires a systemic approach to idempotency, one that is atomic, performant, and correctly handles the complex race conditions inherent in concurrent systems. This is where the Idempotency Key pattern becomes non-negotiable.
This article bypasses the introductory concepts. We assume you know why idempotency is critical. Instead, we will dissect a production-proven, database-backed implementation pattern. We will focus on a two-phase atomic protocol that leverages the transactional guarantees of a relational database like PostgreSQL to provide a robust idempotency layer for both synchronous APIs and asynchronous event consumers.
We will cover:
SELECT ... FOR UPDATE) to serialize concurrent requests for the same key.Let's move beyond theory and build a system that can confidently withstand the chaos of real-world distributed environments.
Core Implementation: The Idempotency Key Store
The foundation of our pattern is a dedicated table in a transactional database. While a system like Redis might seem appealing for its speed, its default single-threaded nature and weaker transactional/durability guarantees (unless configured carefully) make it less suitable for this critical path. The atomicity and consistency of a relational database like PostgreSQL are paramount.
Schema Design
Our idempotency_keys table needs to store not just the key, but the state of the operation and its result. Here is a production-ready schema for PostgreSQL:
-- The stages of an idempotent request lifecycle
CREATE TYPE idempotency_status AS ENUM ('started', 'completed', 'failed');
CREATE TABLE idempotency_keys (
-- The idempotency key itself, provided by the client.
idempotency_key TEXT NOT NULL,
-- Scope the key to a specific user or tenant to prevent key collisions across accounts.
user_id UUID NOT NULL,
-- The current state of the request processing.
status idempotency_status NOT NULL DEFAULT 'started',
-- A hash of the request payload to prevent key reuse for different operations.
request_hash BYTEA,
-- The HTTP response code to be returned on subsequent requests.
response_code INT,
-- The full HTTP response body to be returned.
response_body JSONB,
-- Timestamp for when the key was first seen.
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Timestamp to track when the row lock was acquired. Useful for debugging and cleaning up stale requests.
locked_at TIMESTAMPTZ,
PRIMARY KEY (user_id, idempotency_key)
);
-- An additional index can be useful for cleanup jobs.
CREATE INDEX idx_idempotency_keys_created_at ON idempotency_keys (created_at);
Key Design Decisions:
* Composite Primary Key (user_id, idempotency_key): This is the most critical element for performance. It scopes the key to a user/tenant, allowing clients to generate simple UUIDs without fear of global collision. All lookups will be on this composite key, making them extremely fast.
* status ENUM: Using a native database ENUM type is more efficient and type-safe than a TEXT field. The states (started, completed, failed) represent our two-phase protocol.
* request_hash BYTEA: We store a hash (e.g., SHA-256) of the request's critical parameters. This prevents a client from mistakenly reusing an idempotency key for a fundamentally different operation. Storing it as BYTEA is more compact than hex-encoded text.
* response_body JSONB: Storing the response in a structured JSONB format allows for potential introspection later and is highly efficient in PostgreSQL.
* locked_at TIMESTAMPTZ: This timestamp is set when we begin processing. A background job can use this to identify and handle requests that started but never finished, potentially due to a server crash.
The Request Lifecycle: An Atomic Two-Phase Protocol
This pattern's robustness comes from a carefully orchestrated sequence of database operations executed within a single transaction. We'll implement this logic as an HTTP middleware in Go.
Phase 1: The Lock & Check
This phase happens before any business logic is executed. Its goal is to atomically create a record for the idempotency key and determine if another request is already being processed.
// idempotencyMiddleware is the core HTTP middleware
func (s *Server) idempotencyMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
idempotencyKey := r.Header.Get("Idempotency-Key")
if idempotencyKey == "" {
next.ServeHTTP(w, r)
return
}
// In a real app, you'd get this from a JWT or session.
userID := uuid.MustParse("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11")
tx, err := s.db.BeginTx(r.Context(), nil)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Defer a rollback. If the transaction is committed, this is a no-op.
defer tx.Rollback()
// 1. ATTEMPT TO CREATE THE KEY
// The ON CONFLICT DO NOTHING is crucial. It makes this operation atomic.
// If the key exists, it does nothing and returns 0 rows affected.
// If it doesn't exist, it inserts the row and returns 1 row affected.
result, err := tx.ExecContext(r.Context(),
`INSERT INTO idempotency_keys (user_id, idempotency_key, status, locked_at)
VALUES ($1, $2, 'started', NOW())
ON CONFLICT (user_id, idempotency_key) DO NOTHING`,
userID, idempotencyKey)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
// 2. KEY ALREADY EXISTS, SO WE MUST LOCK AND CHECK ITS STATUS
var ik models.IdempotencyKey
// SELECT ... FOR UPDATE locks the row, preventing other transactions from modifying it
// until our current transaction commits or rolls back. This is the core of race condition prevention.
err = tx.QueryRowContext(r.Context(),
`SELECT status, response_code, response_body
FROM idempotency_keys
WHERE user_id = $1 AND idempotency_key = $2 FOR UPDATE`,
userID, idempotencyKey).Scan(&ik.Status, &ik.ResponseCode, &ik.ResponseBody)
if err != nil {
// If no row is found, it means it was deleted between the INSERT and SELECT.
// This is highly unlikely but possible. We can treat it as a conflict and ask the client to retry.
if err == sql.ErrNoRows {
http.Error(w, "Conflict, please retry", http.StatusConflict)
return
}
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
switch ik.Status {
case "completed":
// Request was already processed successfully. Return the cached response.
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(ik.ResponseCode.Int32)
w.Write(ik.ResponseBody)
// We do NOT commit the transaction here; the defer will roll it back.
// We only read data, so no changes need to be persisted.
return
case "started":
// Another request is currently processing this key. Return a conflict.
http.Error(w, "A request with this key is already in progress", http.StatusConflict)
return
case "failed":
// The previous attempt failed. We could allow a retry.
// For simplicity here, we'll treat it as a conflict, but you could design a retry mechanism.
http.Error(w, "A previous request with this key failed", http.StatusConflict)
return
}
}
// If we get here, rowsAffected was 1, meaning we are the first request for this key.
// We now hold a lock on the new 'started' record within our transaction.
// We pass the transaction down to the handler via the request context.
ctx := context.WithValue(r.Context(), "db_tx", tx)
ctx = context.WithValue(ctx, "idempotency_key", idempotencyKey)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
Phase 2: The Execution & Persist
After the business logic in the main handler runs, it needs to update the idempotency key record with the result and commit the entire transaction. If the business logic fails, the deferred tx.Rollback() will automatically discard the 'started' record, allowing a subsequent request to try again cleanly.
To capture the response from the actual handler, we need a response writer wrapper.
// responseRecorder captures the response to store it in the database.
type responseRecorder struct {
http.ResponseWriter
statusCode int
body []byte
}
func (rec *responseRecorder) WriteHeader(statusCode int) {
rec.statusCode = statusCode
rec.ResponseWriter.WriteHeader(statusCode)
}
func (rec *responseRecorder) Write(body []byte) (int, error) {
rec.body = body
return rec.ResponseWriter.Write(body)
}
// The middleware needs to be updated to use this recorder.
func (s *Server) idempotencyMiddleware(next http.Handler) http.Handler {
// ... (Phase 1 code from above)
// ... after successful key acquisition (rowsAffected == 1)
// Wrap the response writer to capture the response
recorder := &responseRecorder{ResponseWriter: w, statusCode: http.StatusOK}
ctx := context.WithValue(r.Context(), "db_tx", tx)
ctx = context.WithValue(ctx, "idempotency_key", idempotencyKey)
ctx = context.WithValue(ctx, "user_id", userID)
next.ServeHTTP(recorder, r.WithContext(ctx))
// After the handler has run, check if a transaction is still available.
// The handler might have committed or rolled it back itself.
if txFromCtx, ok := r.Context().Value("db_tx").(*sql.Tx); ok && txFromCtx != nil {
// If the handler was successful (2xx status code)
if recorder.statusCode >= 200 && recorder.statusCode < 300 {
// 3. UPDATE THE KEY WITH THE RESPONSE
_, err = tx.ExecContext(r.Context(),
`UPDATE idempotency_keys
SET status = 'completed', response_code = $1, response_body = $2
WHERE user_id = $3 AND idempotency_key = $4`,
recorder.statusCode, recorder.body, userID, idempotencyKey)
if err != nil {
// The update failed, so we can't guarantee idempotency. Rollback.
// The client will get a 500 and should retry.
return
}
// 4. COMMIT THE ENTIRE TRANSACTION
// This atomically commits the business logic's changes AND the idempotency record update.
tx.Commit()
} else {
// The handler failed. The deferred tx.Rollback() will handle cleanup.
}
}
}
This completes the two-phase protocol. The entire operation—business logic and idempotency state change—is wrapped in a single database transaction, guaranteeing atomicity.
Advanced Scenarios & Edge Cases
Race Conditions in Detail
The SELECT ... FOR UPDATE is the lynchpin of our concurrency control. Let's trace the exact database-level interaction for two simultaneous requests, A and B, with the same key:
INSERT ... ON CONFLICT DO NOTHING. It succeeds and inserts a 'started' record. rowsAffected is 1.INSERT ... ON CONFLICT DO NOTHING. The row already exists, so the ON CONFLICT clause is triggered. rowsAffected is 0.rowsAffected as 0, executes SELECT ... FROM idempotency_keys ... FOR UPDATE.'completed' with the response data.'completed'.SELECT ... FOR UPDATE query now executes and retrieves the 'completed' record.'completed' status, serves the cached response from the database, and its transaction is rolled back (since it was a read-only operation).This explicit locking prevents the second request from proceeding until the first is definitively finished, completely eliminating the race condition.
Asynchronous Processing (Message Queue Consumers)
This pattern is even more critical for message queue consumers, which often have built-in retry mechanisms.
The logic remains nearly identical, but the context changes:
* The idempotency key is extracted from the message payload or headers.
* There is no HTTP response to cache. You can store a simple success marker.
* On a duplicate message, instead of returning a cached response, you simply acknowledge the message to remove it from the queue and do not execute the business logic.
Here's a conceptual Kafka consumer example:
func (c *Consumer) handleMessage(ctx context.Context, msg kafka.Message) {
idempotencyKey := string(msg.Headers["idempotency-key"])
userID := string(msg.Headers["user-id"])
tx, err := c.db.BeginTx(ctx, nil)
defer tx.Rollback()
// Phase 1: Lock & Check (same as middleware)
result, err := tx.ExecContext(...) // INSERT ... ON CONFLICT
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
var status string
err := tx.QueryRowContext(...).Scan(&status) // SELECT ... FOR UPDATE
if status == "completed" {
log.Println("Duplicate message detected, acknowledging.")
c.kafkaReader.CommitMessages(ctx, msg) // Acknowledge without processing
return
} else if status == "started" {
// Another consumer instance is processing. Do not ack, let redelivery happen after visibility timeout.
log.Println("Message processing in-flight by another consumer.")
return
}
}
// Phase 2: Execute & Persist
err = c.processBusinessLogic(tx, msg.Value)
if err != nil {
// Business logic failed. The rollback will handle the idempotency record.
// Do not ack the message, let it be redelivered for a retry.
log.Printf("Business logic failed: %v", err)
return
}
// Update idempotency key to 'completed'
_, err = tx.ExecContext(ctx,
`UPDATE idempotency_keys SET status = 'completed' WHERE ...`)
if err != nil {
// This is a critical failure. The logic succeeded but we can't mark it as such.
// Do not ack. A retry will hopefully succeed.
return
}
// Commit DB transaction
err = tx.Commit()
if err != nil { /* ... */ }
// Acknowledge Kafka message
c.kafkaReader.CommitMessages(ctx, msg)
}
Request Hashing for Payload Integrity
A client might accidentally reuse an idempotency key for a different request. For example, a UI bug causes two different "Add to Cart" buttons to use the same key. We must reject this.
This is the purpose of the request_hash column. In the middleware, before the INSERT, we compute a hash of the request's salient features.
// In the middleware...
bodyBytes, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) // Restore the body
hasher := sha256.New()
hasher.Write(bodyBytes)
// You might also include other critical headers or path parameters in the hash
hasher.Write([]byte(r.Method))
hasher.Write([]byte(r.URL.Path))
requestHash := hasher.Sum(nil)
// ... later, when a key exists...
if rowsAffected == 0 {
var ik models.IdempotencyKey
// Also select the request_hash
err = tx.QueryRowContext(r.Context(),
`SELECT status, response_code, response_body, request_hash
FROM idempotency_keys WHERE ... FOR UPDATE`,
...).Scan(&ik.Status, &ik.ResponseCode, &ik.ResponseBody, &ik.RequestHash)
// Compare hashes
if !bytes.Equal(ik.RequestHash, requestHash) {
// This is a client error. The same key is being used for a different request.
http.Error(w, "Idempotency key is being reused with a different request payload", http.StatusUnprocessableEntity)
return
}
// ... proceed with status check
}
Performance Considerations & Optimizations
* Indexing: The composite primary key on (user_id, idempotency_key) is non-negotiable. An EXPLAIN ANALYZE on your SELECT ... FOR UPDATE query should show an "Index Scan" or "Index Only Scan" with a cost in the single digits. Without it, the table scan and subsequent row locking will destroy your database performance.
* Database Contention: Under extreme load, the idempotency_keys table can become a source of contention due to locking. If this occurs:
* Keep Transactions Short: The business logic executed inside the transaction should be as fast as possible. The longer the transaction, the longer the row lock is held.
* Sharding: For massive multi-tenant systems, you could consider sharding the idempotency_keys table, perhaps by user_id or a hash of it. This distributes the lock contention across multiple physical tables or even database instances.
* Advisory Locks: For ultimate performance, you can replace SELECT ... FOR UPDATE with PostgreSQL's advisory locks. These are application-level locks that are faster as they don't have the overhead of MVCC. However, they are more complex to manage correctly (e.g., ensuring they are always released).
* Key Expiration & Garbage Collection: This table will grow indefinitely. A simple background job that runs periodically is essential:
-- Run this daily/weekly in a background job or cron
DELETE FROM idempotency_keys
WHERE created_at < NOW() - INTERVAL '30 days';
The retention period (e.g., 30 days) depends on your clients' expected retry window.
Production-Ready Code Implementation
Below is a simplified but runnable main file demonstrating the server, middleware, and a handler. You can test it with curl to see the idempotency logic in action.
// main.go
package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
)
// --- Server and Models (simplified) ---
type Server struct {
db *sql.DB
}
type IdempotencyKey struct {
Status string
ResponseCode sql.NullInt32
ResponseBody []byte
}
type CreateChargeRequest struct {
Amount int `json:"amount"`
}
type CreateChargeResponse struct {
ChargeID string `json:"charge_id"`
Amount int `json:"amount"`
}
// --- Middleware and Handler (as defined in previous sections) ---
// ... (paste idempotencyMiddleware and responseRecorder here)
// createChargeHandler simulates a business logic operation
func (s *Server) createChargeHandler(w http.ResponseWriter, r *http.Request) {
tx, ok := r.Context().Value("db_tx").(*sql.Tx)
if !ok {
http.Error(w, "Could not get transaction from context", http.StatusInternalServerError)
return
}
var req CreateChargeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
log.Printf("Executing business logic for amount: %d", req.Amount)
// Simulate a database write for the business logic
chargeID := uuid.New()
_, err := tx.ExecContext(r.Context(), `SELECT pg_sleep(0.1)`) // Simulate work
if err != nil {
http.Error(w, "Business logic failed", http.StatusInternalServerError)
return
}
resp := CreateChargeResponse{ChargeID: chargeID.String(), Amount: req.Amount}
respBytes, _ := json.Marshal(resp)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
w.Write(respBytes)
}
// --- Main Function ---
func main() {
db, err := sql.Open("postgres", "user=postgres password=password dbname=test sslmode=disable")
if err != nil {
log.Fatal(err)
}
server := &Server{db: db}
mux := http.NewServeMux()
chargeHandler := http.HandlerFunc(server.createChargeHandler)
mux.Handle("/charges", server.idempotencyMiddleware(chargeHandler))
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", mux))
}
Testing the Implementation
KEY=$(uuidgen)
curl -v -X POST -H "Content-Type: application/json" \
-H "Idempotency-Key: $KEY" \
-d '{"amount": 100}' \
http://localhost:8080/charges
# Responds with 201 Created and a new charge ID.
curl -v -X POST -H "Content-Type: application/json" \
-H "Idempotency-Key: $KEY" \
-d '{"amount": 100}' \
http://localhost:8080/charges
# Responds immediately with 201 Created and the SAME charge ID.
# Check server logs: "Executing business logic..." will NOT appear.
KEY=$(uuidgen)
# Run two requests in the background simultaneously
curl -X POST -H "Content-Type: application/json" -H "Idempotency-Key: $KEY" -d '{"amount": 200}' http://localhost:8080/charges & \
curl -X POST -H "Content-Type: application/json" -H "Idempotency-Key: $KEY" -d '{"amount": 200}' http://localhost:8080/charges
# Both will return the same successful response. Server logs will show the business logic ran only once.
By implementing this robust, database-driven idempotency pattern, you can build truly resilient systems that maintain data integrity despite the unreliable nature of distributed communication.