Advanced RAG: Semantic Caching for Sub-50ms LLM Responses

15 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Unavoidable Bottleneck in Production RAG

In any production-scale Retrieval-Augmented Generation (RAG) system, the end-to-end latency is a critical user experience metric, and the operational cost is a primary business concern. The canonical RAG flow is computationally and financially expensive:

  • Query Embedding: A user query is converted into a high-dimensional vector using a model like text-embedding-ada-002 or a powerful open-source alternative. (Latency: ~100-300ms, Cost: API call)
  • Vector Search: This embedding is used to perform a similarity search against a massive index of document chunks in a vector database like Pinecone, Weaviate, or PostgreSQL with pgvector. (Latency: ~50-500ms, Cost: Database Ops)
  • Context Synthesis: The top-k retrieved document chunks are formatted and injected into a prompt for a Large Language Model (LLM). (Latency: Negligible)
  • LLM Generation: The augmented prompt is sent to a powerful generative model like GPT-4 or Claude 3 to produce the final answer. (Latency: 2-10s+, Cost: Significant API call, per-token)
  • Total latency frequently falls in the 3-12 second range, which is often unacceptable for interactive applications. Furthermore, repeated queries for semantically identical concepts lead to redundant processing and wasted expenditure. A naive exact-match cache (cache['how much is product X?']) is brittle and ineffective, failing as soon as a user asks, "what is the price of product X?".

    The solution is to cache based on semantic meaning, not lexical representation. This post details the architecture and implementation of a high-performance semantic cache layer that can serve responses in under 50ms, dramatically improving UX and reducing costs.

    Semantic Caching: Architecture and Core Components

    A semantic cache intercepts user queries before they hit the expensive RAG pipeline. It quickly determines if a semantically similar query has been answered recently and, if so, returns the cached response.

    Here’s the high-level data flow:

    mermaid
    graph TD
        A[User Query] --> B{Generate Low-Dim Embedding};
        B --> C{Search Semantic Cache (Vector DB)};
        C -->|Similarity > Threshold?| D[Cache Hit];
        C -->|Similarity < Threshold| E[Cache Miss];
        D --> F[Return Cached Response];
        E --> G[Execute Full RAG Pipeline];
        G --> H{Store Query Embedding + Response in Cache};
        H --> I[Return Generated Response];
        A --> I;
        F --> J[End];
        I --> J;

    The key components enabling this architecture are:

  • Fast, Lightweight Embedding Model: For the cache lookup, we don't need the most powerful (and slow) embedding model. A smaller, faster model like sentence-transformers/all-MiniLM-L6-v2 is ideal. It's optimized for speed and produces a lower-dimensional embedding (384 dimensions) that is perfect for low-latency similarity search. The primary, high-accuracy model is still used for the main document retrieval pipeline on a cache miss.
  • Low-Latency Vector Store: The cache's vector database must be incredibly fast. While dedicated vector DBs work, an in-memory store like Redis with the RediSearch module is a superior choice for this use case. Its sub-millisecond P99 latency for HNSW index lookups is exactly what we need. We can also leverage Redis's built-in TTL and eviction policies.
  • Tunable Similarity Threshold: A configurable cosine similarity score (e.g., 0.98) determines what constitutes a "hit". This is a critical parameter that balances the cache hit rate against the risk of returning a contextually incorrect answer.
  • Deep Dive: Implementation with FastAPI, Redis, and Sentence Transformers

    Let's build a production-ready semantic cache as a middleware layer in a Python-based application using FastAPI. This example assumes you have Redis with the RediSearch module available.

    Prerequisites:

    bash
    pip install fastapi uvicorn redis sentence-transformers

    Step 1: Setting up the Redis Index

    First, we need to define the schema for our cache in Redis. We'll store the original query text, the response, and the vector embedding. We'll use an HNSW (Hierarchical Navigable Small World) index for the vector field, which provides excellent performance for low-latency Approximate Nearest Neighbor (ANN) search.

    Connect to your Redis instance via redis-cli and run this command:

    redis-cli
    FT.CREATE semantic_cache_idx ON HASH PREFIX 1 cache:query: SCHEMA query_text TEXT query_embedding VECTOR HNSW 6 TYPE FLOAT32 DIM 384 DISTANCE_METRIC COSINE response_text TEXT

    Let's break down this command:

    * FT.CREATE semantic_cache_idx: Creates a new search index named semantic_cache_idx.

    * ON HASH PREFIX 1 cache:query:: Specifies that we are indexing HASH data structures in Redis whose keys start with cache:query:.

    * SCHEMA ...: Defines the fields to be indexed.

    * query_embedding VECTOR HNSW 6 ...: This is the crucial part. It defines query_embedding as a vector field.

    * HNSW 6: Uses the HNSW algorithm. The 6 is a configuration parameter; we're using default values here for simplicity. In production, you'd tune M and EF_CONSTRUCTION.

    * TYPE FLOAT32 DIM 384: Specifies the data type and dimensionality, which must match our all-MiniLM-L6-v2 model.

    * DISTANCE_METRIC COSINE: We'll use cosine similarity to measure the distance between vectors. This is standard for normalized sentence embeddings.

    Step 2: Building the Caching Service

    Now, let's write the Python code. We'll create a SemanticCache class to encapsulate the logic and a FastAPI endpoint to expose it.

    python
    import os
    import uuid
    import time
    import numpy as np
    import redis
    from fastapi import FastAPI, Request, HTTPException
    from pydantic import BaseModel
    from sentence_transformers import SentenceTransformer
    
    # --- Configuration ---
    REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
    REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
    CACHE_INDEX_NAME = "semantic_cache_idx"
    CACHE_SIMILARITY_THRESHOLD = 0.98  # High threshold for high confidence
    CACHE_VECTOR_DIMENSION = 384
    CACHE_TTL_SECONDS = 3600 # 1 hour
    
    # --- Models ---
    class QueryRequest(BaseModel):
        query: str
    
    class QueryResponse(BaseModel):
        source: str
        response: str
        latency_ms: float
    
    # --- Mock RAG Pipeline ---
    # In a real application, this would be a complex function call.
    async def execute_full_rag_pipeline(query: str) -> str:
        print(f"\033[91mCACHE MISS: Executing full RAG pipeline for query: '{query}'\033[0m")
        import asyncio
        # Simulate network latency and LLM generation time
        await asyncio.sleep(2.5)
        return f"This is a freshly generated response for the query: '{query}'"
    
    # --- Semantic Cache Service ---
    class SemanticCache:
        def __init__(self, redis_host, redis_port, index_name):
            self.client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
            self.index_name = index_name
            self.model = SentenceTransformer('all-MiniLM-L6-v2')
            print("SemanticCache initialized.")
    
        def get_embedding(self, text: str) -> np.ndarray:
            return self.model.encode(text, convert_to_tensor=False)
    
        def search_cache(self, query_embedding: np.ndarray):
            query_vector = query_embedding.astype(np.float32).tobytes()
            
            # K=1 because we only care about the single most similar cached entry
            # We use a dialect of 2 to prefer HNSW over brute force
            q = (
                redis.ft.query.Query(f"*=>[KNN 1 @query_embedding $query_vec AS vector_score]")
                .sort_by("vector_score")
                .return_fields("response_text", "vector_score")
                .dialect(2)
            )
            
            params = {"query_vec": query_vector}
            try:
                results = self.client.ft(self.index_name).search(q, query_params=params)
            except redis.exceptions.ResponseError as e:
                print(f"Error searching Redis index. It might not exist: {e}")
                # This can happen if the index wasn't created. We'll treat it as a cache miss.
                return None
    
            if results.docs:
                doc = results.docs[0]
                similarity = 1 - float(doc.vector_score) # Convert cosine distance to similarity
                print(f"\033[94mCACHE CHECK: Found similar query with similarity: {similarity:.4f}\033[0m")
                if similarity > CACHE_SIMILARITY_THRESHOLD:
                    return doc.response_text
            return None
    
        def add_to_cache(self, query: str, response: str, query_embedding: np.ndarray):
            key = f"cache:query:{uuid.uuid4()}"
            self.client.hset(key, mapping={
                "query_text": query,
                "query_embedding": query_embedding.astype(np.float32).tobytes(),
                "response_text": response
            })
            self.client.expire(key, CACHE_TTL_SECONDS)
            print(f"\033[92mCACHE ADD: Added new entry to cache for query: '{query}'\033[0m")
    
    # --- FastAPI Application ---
    app = FastAPI()
    cache = SemanticCache(REDIS_HOST, REDIS_PORT, CACHE_INDEX_NAME)
    
    @app.post("/query", response_model=QueryResponse)
    async def handle_query(request: QueryRequest):
        start_time = time.time()
        
        # 1. Generate query embedding for cache lookup
        query_embedding = cache.get_embedding(request.query)
        
        # 2. Search the cache
        cached_response = cache.search_cache(query_embedding)
        
        if cached_response:
            end_time = time.time()
            return QueryResponse(
                source="semantic_cache",
                response=cached_response,
                latency_ms=(end_time - start_time) * 1000
            )
    
        # 3. Cache miss: execute full pipeline
        response_text = await execute_full_rag_pipeline(request.query)
        
        # 4. Add the new result to the cache asynchronously
        # Note: In a real app, you might do this in a background task
        cache.add_to_cache(request.query, response_text, query_embedding)
        
        end_time = time.time()
        return QueryResponse(
            source="rag_pipeline",
            response=response_text,
            latency_ms=(end_time - start_time) * 1000
        )
    
    # To run: uvicorn your_script_name:app --reload

    Testing the Implementation:

    Run the app and use a tool like curl to test it.

    First Request (Cache Miss):

    bash
    curl -X POST "http://127.0.0.1:8000/query" -H "Content-Type: application/json" -d '{"query": "How much does the Pro Plan cost per month?"}'

    Expected Output:

    json
    {
      "source": "rag_pipeline",
      "response": "This is a freshly generated response for the query: 'How much does the Pro Plan cost per month?'",
      "latency_ms": 2580.123
    }

    In your server logs, you'll see the "CACHE MISS" and "CACHE ADD" messages.

    Second, Semantically Similar Request (Cache Hit):

    bash
    curl -X POST "http://127.0.0.1:8000/query" -H "Content-Type: application/json" -d '{"query": "What is the monthly price for the professional plan?"}'

    Expected Output:

    json
    {
      "source": "semantic_cache",
      "response": "This is a freshly generated response for the query: 'How much does the Pro Plan cost per month?'",
      "latency_ms": 35.456
    }

    Here, the latency_ms is dramatically lower. The server logs will show a "CACHE CHECK" message with a high similarity score, confirming a hit.

    Performance Benchmarks and Cost Analysis

    The performance gains are not just theoretical. Here’s a typical comparison for a production system:

    MetricCache Miss (Full RAG Pipeline)Cache Hit (Semantic Cache)Improvement Factor
    P95 Latency4500 ms45 ms100x
    Embedding Cost$0.0001 (Ada-002)$0 (Local Model)Infinite
    LLM Cost (GPT-4)~$0.04 (4k context, 1k output)$0Infinite
    ComputeHigh (LLM Inference)Low (Embedding + Redis)~50-200x

    With even a modest 30% cache hit rate, you can reduce your operational costs by nearly 30% and significantly lower your average response latency.

    Advanced Edge Cases and Production Patterns

    A simple implementation is a good start, but production systems require handling complex edge cases.

    1. The Hard Problem: Cache Invalidation

    What happens if the source document for a cached answer is updated? The cache now holds stale, incorrect information. This is the most challenging aspect of any caching system.

    Strategy: Event-Driven Invalidation

    This is the most robust solution. It requires coupling your caching system with your data ingestion pipeline.

  • Store Document IDs with Cache Entries: When you generate a response in the full RAG pipeline, you know which source document chunks were used. Modify the cache entry to store these document IDs.
  • redis-cli
        # New schema with a 'source_docs' field
        FT.CREATE ... SCHEMA ... source_docs TAG

    When adding to the cache, you'd store a comma-separated list of IDs: HSET cache:query:xyz ... source_docs "doc_123,doc_456"

  • Create an Invalidation Webhook: Your data ingestion pipeline (e.g., the system that updates your main vector DB) must call a new endpoint on your application whenever a document is updated, say /invalidate-cache.
  • Implement the Invalidation Logic: This endpoint will perform a query against the cache to find and delete all entries that relied on the updated document.
  • python
        # In your FastAPI app
        class InvalidationRequest(BaseModel):
            document_id: str
    
        @app.post("/invalidate-cache")
        async def invalidate(request: InvalidationRequest):
            # Use RediSearch to find all cache entries tagged with this document ID
            # The query syntax for a TAG field is @field:{value}
            query = redis.ft.query.Query(f"@source_docs:{{{request.document_id}}}")
            results = cache.client.ft(CACHE_INDEX_NAME).search(query)
            
            deleted_count = 0
            if results.docs:
                keys_to_delete = [doc.id for doc in results.docs]
                if keys_to_delete:
                    cache.client.delete(*keys_to_delete)
                    deleted_count = len(keys_to_delete)
            
            print(f"Invalidated {deleted_count} cache entries for document_id: {request.document_id}")
            return {"status": "success", "invalidated_count": deleted_count}

    This creates a highly responsive caching system that actively purges stale data.

    2. Threshold Tuning and A/B Testing

    The CACHE_SIMILARITY_THRESHOLD is a magic number. Setting it is a trade-off:

    * Too high (e.g., 0.995): Low cache hit rate. You miss many opportunities to serve from the cache.

    * Too low (e.g., 0.90): High cache hit rate, but you risk returning answers to subtly different questions, which erodes user trust.

    In production, this value should be determined empirically. Implement a shadow-mode A/B testing framework:

    • Set a production threshold (e.g., 0.98).
    • When a query comes in and the similarity is between a lower bound (e.g., 0.95) and the production threshold, serve from the full RAG pipeline.
  • However, log both the generated response and what would have been the cached response.
    • Manually or with an evaluation LLM, compare these logged pairs to determine if the cached response would have been acceptable. This data allows you to confidently adjust the threshold.

    3. Negative Caching

    If your RAG system frequently receives queries about topics outside its knowledge base, it will repeatedly perform expensive searches only to return "I don't have information about that." This is a prime candidate for caching.

    Modify your add_to_cache logic to store these negative results. This prevents the system from wasting resources on repeatedly searching for information it knows doesn't exist.

    Conclusion: Moving Beyond Naive RAG

    While foundational RAG is powerful, it's not a production-ready solution without significant performance and cost optimization. A semantic cache is not a mere enhancement; it is a critical architectural component for any RAG system intended for interactive, real-world use.

    By implementing a low-latency vector cache with a fast embedding model, you can transform the user experience from sluggish to instantaneous for a significant portion of user queries. The real challenge for senior engineers lies not in the initial implementation, but in building robust, production-grade systems around it, particularly in managing the cache lifecycle through sophisticated invalidation strategies and continuous performance tuning. This approach is what separates proof-of-concept RAG demos from scalable, cost-effective, and truly useful AI products.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles