Proactive Cache Invalidation with Debezium, Kafka, and Redis

16 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Achilles' Heel of Distributed Systems: Stale Caches

In any high-throughput, distributed architecture, caching is not a luxury; it's a fundamental requirement for performance and scalability. However, the most common caching strategies—Time-To-Live (TTL) and Write-Through/Aside—introduce their own set of insidious problems that senior engineers constantly battle.

* TTL-based Caching: Simple, but inherently flawed. It creates a window of data inconsistency between the time the source of truth is updated and the cache key expires. For many applications, serving stale data for seconds or even minutes is unacceptable. It's a blunt instrument that trades correctness for simplicity.

* Write-Through/Aside Caching: This pattern improves consistency by having the application code explicitly update or invalidate the cache after a database write. While effective in a monolith, it breaks down catastrophically in a microservices environment. The service owning the data (Service A) makes a change, but how do a dozen other services (B, C, D...) that cache that data know it has been updated? This leads to tight coupling via synchronous API calls or a fragile, custom eventing system, defeating the purpose of a decoupled architecture.

The core problem is one of information flow. The cache needs to react to state changes in the database, but it's fundamentally disconnected from the database's internal transaction log. What if we could tap directly into that log? This is precisely the problem that Change Data Capture (CDC) solves.

This article details the architecture and implementation of a production-grade, proactive cache invalidation system using a CDC pipeline. We will build a system that listens directly to changes in a PostgreSQL database and intelligently updates a Redis cache in near real-time, ensuring high data consistency across a distributed system.

The Architecture: An Event-Driven Invalidation Pipeline

Our architecture is composed of several key components working in concert:

  • PostgreSQL (Source of Truth): The primary database. We leverage its logical replication feature, which provides a stream of data changes from the Write-Ahead Log (WAL).
  • Debezium (The CDC Engine): A Kafka Connect connector that taps into PostgreSQL's logical replication slot, converts raw WAL changes into structured events, and publishes them to Kafka.
  • Apache Kafka (The Event Backbone): A durable, scalable, and ordered message broker that decouples the database from the cache invalidation logic. It acts as a resilient buffer for change events.
  • Go Invalidation Service (The Consumer): A custom microservice that consumes the change events from Kafka, deserializes them, and applies the necessary logic to invalidate or update keys in Redis.
  • Redis (The Distributed Cache): The high-performance key-value store that our applications read from.
  • Here's a high-level view of the data flow:

    mermaid
    flowchart TD
        A[Client Application] --> B(PostgreSQL)
        B -- WAL --> C{Debezium Connector}
        C -- Change Events --> D[(Kafka Cluster)]
        D -- Consumes Events --> E[Go Invalidation Service]
        E -- INVAL/SET --> F(Redis Cache)
        G[Other Microservices] --> F
    
        subgraph Kafka Connect
            C
        end

    This event-driven, asynchronous pipeline ensures that the service managing the cache is completely decoupled from the service that owns the data. It reacts to the effect (a data change) rather than being commanded by the cause (an application write).

    Step 1: Configuring the Debezium PostgreSQL Connector

    Assuming you have a Kafka Connect cluster running, the first step is to configure and deploy the Debezium connector. This is done via the Kafka Connect REST API. The configuration is critical for performance, data consistency, and correctness.

    First, ensure your PostgreSQL instance is configured for logical replication. In postgresql.conf:

    ini
    wal_level = logical
    max_wal_senders = 10
    max_replication_slots = 10

    And ensure the user Debezium will connect with has replication privileges:

    sql
    ALTER USER debezium_user WITH REPLICATION;

    Now, let's craft the connector configuration. We'll monitor a products table in a public schema.

    json
    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres.prod.internal",
        "database.port": "5432",
        "database.user": "debezium_user",
        "database.password": "your-secret-password",
        "database.dbname": "inventory_db",
        "database.server.name": "inventory_postgres",
        "plugin.name": "pgoutput",
        "table.include.list": "public.products,public.categories",
        "snapshot.mode": "initial",
        "snapshot.locking.mode": "none",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "decimal.handling.mode": "double",
        "topic.prefix": "cdc"
      }
    }

    Dissecting the Critical Configuration Parameters:

    * plugin.name: pgoutput is the standard logical decoding plugin in modern PostgreSQL versions. It's efficient and well-supported.

    * table.include.list: A comma-separated list of tables to monitor. Be specific. Monitoring an entire database can generate enormous traffic. We're targeting products and categories.

    * snapshot.mode: This is one of the most important settings. initial tells Debezium to perform a consistent snapshot of all included tables upon its first startup. It reads the tables, generates create events for every row, and publishes them to Kafka. After the snapshot is complete, it switches to streaming changes from the WAL. This is essential for populating the cache for the first time.

    * value.converter: AvroConverter is strongly recommended for production. It integrates with a Schema Registry, providing schema evolution, strong typing, and significantly smaller message payloads compared to JSON.

    * database.server.name / topic.prefix: These control the naming of the Kafka topics. With the config above, changes to the products table will be published to the cdc.inventory_postgres.public.products topic.

    The Debezium Change Event Structure

    Understanding the message format is key to writing the consumer. A Debezium Avro message has a detailed envelope and a payload.

    json
    {
      "before": null, // State of the row BEFORE the change (null for inserts)
      "after": {      // State of the row AFTER the change (null for deletes)
        "id": 101,
        "name": "Quantum SSD 1TB",
        "description": "A high-speed solid state drive.",
        "price": 129.99,
        "stock_quantity": 500,
        "category_id": 5,
        "updated_at": 1678886400000
      },
      "source": { ... }, // Metadata about source (Postgres version, txid, etc.)
      "op": "c", // Operation type: c=create, u=update, d=delete, r=read (snapshot)
      "ts_ms": 1678886401000, // Timestamp of the event
      "transaction": null
    }

    * For an INSERT (op: "c"), before is null and after contains the new row.

    * For an UPDATE (op: "u"), before contains the old row data and after contains the new.

    * For a DELETE (op: "d"), before contains the data of the deleted row and after is null.

    This rich structure allows our consumer to implement sophisticated logic.

    Step 2: Implementing the Go Invalidation Service

    Now we'll write the consumer service in Go. It will use the confluent-kafka-go library for consuming from Kafka and the go-redis library for interacting with Redis. We also need an Avro deserialization library that integrates with the Confluent Schema Registry.

    Project Structure:

    text
    /invalidation-service
      - go.mod
      - go.sum
      - main.go
      - consumer/consumer.go
      - processor/processor.go
      - cache/redis.go

    The Core Consumer Logic (`consumer/consumer.go`)

    This module is responsible for connecting to Kafka, subscribing to topics, and fetching messages in a loop.

    go
    package consumer
    
    import (
    	"fmt"
    	"os"
    	"os/signal"
    	"syscall"
    
    	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
    	"github.com/riferrei/srclient"
    )
    
    // MessageProcessor defines the interface for processing Kafka messages.
    type MessageProcessor interface {
    	Process(msg *kafka.Message) error
    }
    
    // KafkaConsumer wraps the Kafka consumer and its configuration.
    type KafkaConsumer struct {
    	consumer *kafka.Consumer
    	processor MessageProcessor
    	 topics   []string
    }
    
    // NewKafkaConsumer creates and configures a new Kafka consumer.
    func NewKafkaConsumer(brokers, groupID string, topics []string, processor MessageProcessor) (*KafkaConsumer, error) {
    	c, err := kafka.NewConsumer(&kafka.ConfigMap{
    		"bootstrap.servers":       brokers,
    		"group.id":                groupID,
    		"auto.offset.reset":       "earliest",
    		"enable.auto.commit":      "false", // We will commit offsets manually
    	})
    	if err != nil {
    		return nil, fmt.Errorf("failed to create consumer: %w", err)
    	}
    
    	return &KafkaConsumer{
    		consumer:  c,
    		processor: processor,
    		topics:    topics,
    	}, nil
    }
    
    // Run starts the consumer loop.
    func (kc *KafkaConsumer) Run() {
    	kc.consumer.SubscribeTopics(kc.topics, nil)
    	fmt.Printf("Consumer started. Subscribed to topics: %v\n", kc.topics)
    
    	sigchan := make(chan os.Signal, 1)
    	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    
    	run := true
    	for run {
    		select {
    		case sig := <-sigchan:
    			fmt.Printf("Caught signal %v: terminating\n", sig)
    			run = false
    		default:
    			// Poll for messages with a timeout
    			ev := kc.consumer.Poll(100)
    			if ev == nil {
    				continue
    			}
    
    			switch e := ev.(type) {
    			case *kafka.Message:
    				if err := kc.processor.Process(e); err != nil {
    					// In a real app, send to a DLQ or implement robust retry logic
    					fmt.Printf("%%v, error processing message: %v\n", err)
    				} else {
    					// Commit the offset after successful processing
    					kc.consumer.CommitMessage(e)
    				}
    			case kafka.Error:
    				fmt.Fprintf(os.Stderr, "%% Consumer error: %v (%v)\n", e.code(), e)
    				if e.IsFatal() {
    					run = false
    				}
    			}
    		}
    	}
    
    	fmt.Println("Closing consumer")
    	kc.consumer.Close()
    }

    Key Design Choice: enable.auto.commit is set to false. We manually commit offsets only after a message has been successfully processed. This provides at-least-once processing semantics, which is crucial for data consistency. If the service crashes after processing but before committing, it will re-process the message upon restart.

    The Message Processor (`processor/processor.go`)

    This is where the core business logic resides. It deserializes the Avro payload and interacts with the cache.

    go
    package processor
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    
    	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
    	"github.com/linkedin/goavro/v2"
    	"github.com/riferrei/srclient"
    	"github.com/go-redis/redis/v8"
    )
    
    // DebeziumPayload represents the structure of the 'after' or 'before' field.
    type ProductPayload struct {
    	ID int32 `json:"id"`
    	Name string `json:"name"`
    	Price float64 `json:"price"`
    	StockQuantity int32 `json:"stock_quantity"`
        CategoryID int32 `json:"category_id"`
    }
    
    // DebeziumMessage represents the overall message structure.
    type DebeziumMessage struct {
    	Before map[string]interface{} `json:"before"`
    	After  map[string]interface{} `json:"after"`
    	Op     string                 `json:"op"`
    }
    
    // Cache defines the interface for our cache operations.
    type Cache interface {
    	SetProduct(ctx context.Context, product ProductPayload) error
    	DeleteProduct(ctx context.Context, productID int32) error
    }
    
    // AvroProcessor processes Debezium messages.
    type AvroProcessor struct {
    	schemaRegistryClient *srclient.SchemaRegistryClient
    	cache                Cache
    }
    
    // NewAvroProcessor creates a new processor.
    func NewAvroProcessor(srURL string, cache Cache) *AvroProcessor {
    	schemaRegistryClient := srclient.NewSchemaRegistryClient(srURL)
    	return &AvroProcessor{
    		schemaRegistryClient: schemaRegistryClient,
    		cache:                cache,
    	}
    }
    
    func (p *AvroProcessor) Process(msg *kafka.Message) error {
    	// Debezium Avro messages have a 5-byte header: 1 magic byte + 4 bytes for schema ID
    	if len(msg.Value) < 5 {
    		return fmt.Errorf("invalid Avro message length")
    	}
    	schemaID := int(msg.Value[1])<<24 | int(msg.Value[2])<<16 | int(msg.Value[3])<<8 | int(msg.Value[4])
    	schema, err := p.schemaRegistryClient.GetSchema(schemaID)
    	if err != nil {
    		return fmt.Errorf("error getting schema ID %d: %w", schemaID, err)
    	}
    
    	codec, err := goavro.NewCodec(schema.Schema())
    	if err != nil {
    		return fmt.Errorf("error creating Avro codec: %w", err)
    	}
    
    	// Decode the Avro binary data into a native Go map
    	native, _, err := codec.NativeFromBinary(msg.Value[5:])
    	if err != nil {
    		return fmt.Errorf("error decoding Avro binary: %w", err)
    	}
    
    	// We can now work with the data as a map[string]interface{}
    	// To make it easier, we'll marshal/unmarshal to our typed struct
    	jsonData, _ := json.Marshal(native)
    	var debeziumMsg DebeziumMessage
    	if err := json.Unmarshal(jsonData, &debeziumMsg); err != nil {
    		return fmt.Errorf("error unmarshaling to DebeziumMessage: %w", err)
    	}
    
    	return p.handleOperation(context.Background(), &debeziumMsg)
    }
    
    func (p *AvroProcessor) handleOperation(ctx context.Context, msg *DebeziumMessage) error {
    	switch msg.Op {
    	case "c", "u", "r": // Create, Update, or Read (from snapshot)
    		if msg.After == nil {
    			return fmt.Errorf("op '%s' with nil 'after' payload", msg.Op)
    		}
    		// Convert map to struct for type safety
    		var payload ProductPayload
    		jsonData, _ := json.Marshal(msg.After)
    		json.Unmarshal(jsonData, &payload)
    
    		fmt.Printf("Cache WARMING: Setting product %d\n", payload.ID)
    		return p.cache.SetProduct(ctx, payload)
    
    	case "d": // Delete
    		if msg.Before == nil {
    			return fmt.Errorf("op 'd' with nil 'before' payload", msg.Op)
    		}
    		var payload ProductPayload
    		jsonData, _ := json.Marshal(msg.Before)
    		json.Unmarshal(jsonData, &payload)
    
    		fmt.Printf("Cache INVALIDATION: Deleting product %d\n", payload.ID)
    		return p.cache.DeleteProduct(ctx, payload.ID)
    	
    	default:
    		fmt.Printf("Ignoring unknown operation: %s\n", msg.Op)
    		return nil
    	}
    }

    The Redis Cache Implementation (`cache/redis.go`)

    This module implements the Cache interface and handles the specifics of storing product data in Redis. We'll store each product as a JSON string in a simple key-value pair, but using Redis Hashes is also a viable, often more memory-efficient strategy.

    go
    package cache
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"time"
    
    	"github.com/go-redis/redis/v8"
    )
    
    // RedisCache implements the Cache interface using Redis.
    type RedisCache struct {
    	client *redis.Client
    }
    
    // NewRedisCache creates a new Redis client wrapper.
    func NewRedisCache(addr string) *RedisCache {
    	rdb := redis.NewClient(&redis.Options{
    		Addr: addr,
    	})
    	return &RedisCache{client: rdb}
    }
    
    func productKey(productID int32) string {
    	return fmt.Sprintf("product:%d", productID)
    }
    
    // SetProduct warms the cache by setting the product data.
    func (r *RedisCache) SetProduct(ctx context.Context, product ProductPayload) error {
    	key := productKey(product.ID)
    	jsonData, err := json.Marshal(product)
    	if err != nil {
    		return fmt.Errorf("failed to marshal product: %w", err)
    	}
    
    	// Set with a long expiration to act as a fallback/safety net
    	return r.client.Set(ctx, key, jsonData, 24*time.Hour).Err()
    }
    
    // DeleteProduct invalidates a product from the cache.
    func (r *RedisCache) DeleteProduct(ctx context.Context, productID int32) error {
    	key := productKey(productID)
    	return r.client.Del(ctx, key).Err()
    }

    Advanced Pattern 1: Cache Warming vs. Pure Invalidation

    Our current implementation uses a cache warming approach. When an update (u) or create (c) event arrives, we use the after payload to directly SET the new value in Redis.

    Pros:

    * Eliminates cache misses for updated items. The next application read is guaranteed to be a cache hit with fresh data.

    * Reduces load on the primary database, as the read path doesn't need to fetch the data again.

    Cons:

    * The invalidation service needs to know the exact structure of the cache key and value, creating a tighter coupling between the consumer and the cached data format.

    * If the cached object is a complex aggregation of multiple tables, the simple after payload from one table's CDC event may not be sufficient to reconstruct it.

    An alternative is pure invalidation. In this model, for c, u, and d events, we simply DEL the corresponding cache key.

    Pros:

    * The invalidation service is simpler. It only needs to know how to derive the cache key from the event's primary key, not the value structure.

    * It works for complex, aggregated cache objects. The invalidation signal is just a trigger for the application to re-fetch and re-build the object from the source of truth on the next read.

    Cons:

    * It guarantees a cache miss (and thus a database hit) for every updated item on its next access. This can lead to a "thundering herd" problem if a popular item is updated.

    Choosing the right pattern depends on your access patterns. For simple 1:1 table-to-cache-object mappings, warming is superior. For complex, joined objects, pure invalidation is often the more pragmatic and decoupled approach.

    Advanced Pattern 2: Handling Relational Data Invalidation

    The most complex challenge in CDC-based caching is handling relationships. Consider our schema: a products table has a category_id foreign key to a categories table. Our cached product object might look like this:

    json
    {
      "id": 101,
      "name": "Quantum SSD 1TB",
      "category_name": "Storage"
    }

    What happens when we update the name in the categories table from "Storage" to "Solid State Drives"? The CDC event will be on the cdc.inventory_postgres.public.categories topic. Our current products consumer won't see it, and all product caches with the old category name will become stale.

    Solution: A Multi-Stage or "Smart" Invalidation Service

    The invalidation service must be aware of these relationships. When it processes a change from the categories topic, it must perform a secondary action.

  • Consume from multiple topics: The service subscribes to both ...products and ...categories topics.
  • Implement relational logic: When a categories update event is received, the service must:
  • a. Extract the category_id.

    b. Query a data source to find all product_ids associated with that category_id. This data source could be the primary PostgreSQL database itself, or a pre-built lookup table/index in a service like Elasticsearch or even another Redis set.

    c. For each product_id found, issue a DEL command to Redis for product:.

    Here's a conceptual code snippet for the processor:

    go
    // In AvroProcessor.Process, add a topic check
    func (p *AvroProcessor) Process(msg *kafka.Message) error {
        // ... avro deserialization ...
    
        topic := *msg.TopicPartition.Topic
        if strings.HasSuffix(topic, "products") {
            return p.handleProductOperation(ctx, &debeziumMsg)
        } else if strings.HasSuffix(topic, "categories") {
            return p.handleCategoryOperation(ctx, &debeziumMsg)
        }
        return nil
    }
    
    // handleCategoryOperation finds and invalidates all related products.
    func (p *AvroProcessor) handleCategoryOperation(ctx context.Context, msg *DebeziumMessage) error {
        // We only care about updates to the category name
        if msg.Op != "u" { 
            return nil
        }
    
        // ... extract category ID from msg.After ...
        categoryID := getCategoryID(msg.After)
    
        // Query to find affected products
        // THIS IS A CRITICAL DESIGN DECISION. Querying Postgres adds load.
        // A materialized view or separate lookup service is a better pattern for high-throughput.
        productIDs, err := p.db.FindProductIDsByCategoryID(ctx, categoryID)
        if err != nil {
            return err
        }
    
        // Use Redis Pipelining for batch invalidation
        pipe := p.cache.Pipeline()
        for _, pid := range productIDs {
            pipe.Del(ctx, productKey(pid))
        }
        _, err = pipe.Exec(ctx)
        return err
    }

    Performance Consideration: The lookup step (FindProductIDsByCategoryID) is a performance bottleneck. Hitting the primary OLTP database for every category update can be problematic. A better production pattern is to maintain a materialized view or an inverted index (e.g., a Redis Set category_products: containing product IDs) that is also updated via CDC.

    Edge Cases and Production Hardening

    * Idempotency: With at-least-once delivery, message reprocessing is inevitable. If your cache operation isn't idempotent (e.g., incrementing a counter), you'll have bugs. Our SET and DEL operations are naturally idempotent, which is a major advantage. For non-idempotent operations, you must implement a check, such as storing the last processed Kafka offset per partition in Redis and ignoring messages with older or equal offsets.

    Out-of-Order Events: Kafka guarantees message order within a partition*. Debezium, by default, routes all changes for a given primary key to the same partition, preserving order for a single row. However, if you reprocess from a DLQ or have complex topologies, you could process an older update after a newer one. To solve this, include a timestamp or LSN (Log Sequence Number) from the Debezium source block in your cached value. When processing a message, compare its timestamp/LSN with the one in the cache and discard the event if it's older.

    * Connector Failures: Kafka Connect is fault-tolerant. It periodically saves Debezium's progress (the WAL position) to a Kafka topic. If the connector task fails and restarts, it will resume from the last saved position, preventing data loss. No manual intervention is typically required.

    * Schema Evolution: What if a column is added to the products table? As long as you're using Avro and the Schema Registry with a compatible evolution strategy (e.g., adding a new field with a default value), your Go consumer will continue to work. The goavro library will handle the schema differences gracefully. This is a primary reason to avoid raw JSON in the pipeline.

    Conclusion

    Building a Change Data Capture pipeline for cache invalidation is a significant architectural investment. It introduces new components and complexity compared to traditional caching methods. However, the payoff is enormous for a distributed system that demands high data consistency and service decoupling.

    By leveraging PostgreSQL's logical replication, Debezium's robust CDC capabilities, Kafka's durable event streaming, and a well-designed consumer, you can build a system that proactively and reliably synchronizes your caches with your source of truth. This architecture eliminates the entire class of bugs caused by stale data, reduces load on your primary database, and enables your microservices to operate with greater autonomy and correctness. It is a powerful, production-proven pattern that addresses a fundamental challenge in modern software engineering.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles