Advanced RAG: Semantic Caching for Sub-50ms LLM Responses
The Unavoidable Bottleneck in Production RAG
In any production-scale Retrieval-Augmented Generation (RAG) system, the end-to-end latency is a critical user experience metric, and the operational cost is a primary business concern. The canonical RAG flow is computationally and financially expensive:
text-embedding-ada-002 or a powerful open-source alternative. (Latency: ~100-300ms, Cost: API call)pgvector. (Latency: ~50-500ms, Cost: Database Ops)Total latency frequently falls in the 3-12 second range, which is often unacceptable for interactive applications. Furthermore, repeated queries for semantically identical concepts lead to redundant processing and wasted expenditure. A naive exact-match cache (cache['how much is product X?']) is brittle and ineffective, failing as soon as a user asks, "what is the price of product X?".
The solution is to cache based on semantic meaning, not lexical representation. This post details the architecture and implementation of a high-performance semantic cache layer that can serve responses in under 50ms, dramatically improving UX and reducing costs.
Semantic Caching: Architecture and Core Components
A semantic cache intercepts user queries before they hit the expensive RAG pipeline. It quickly determines if a semantically similar query has been answered recently and, if so, returns the cached response.
Here’s the high-level data flow:
graph TD
A[User Query] --> B{Generate Low-Dim Embedding};
B --> C{Search Semantic Cache (Vector DB)};
C -->|Similarity > Threshold?| D[Cache Hit];
C -->|Similarity < Threshold| E[Cache Miss];
D --> F[Return Cached Response];
E --> G[Execute Full RAG Pipeline];
G --> H{Store Query Embedding + Response in Cache};
H --> I[Return Generated Response];
A --> I;
F --> J[End];
I --> J;
The key components enabling this architecture are:
sentence-transformers/all-MiniLM-L6-v2 is ideal. It's optimized for speed and produces a lower-dimensional embedding (384 dimensions) that is perfect for low-latency similarity search. The primary, high-accuracy model is still used for the main document retrieval pipeline on a cache miss.0.98) determines what constitutes a "hit". This is a critical parameter that balances the cache hit rate against the risk of returning a contextually incorrect answer.Deep Dive: Implementation with FastAPI, Redis, and Sentence Transformers
Let's build a production-ready semantic cache as a middleware layer in a Python-based application using FastAPI. This example assumes you have Redis with the RediSearch module available.
Prerequisites:
pip install fastapi uvicorn redis sentence-transformers
Step 1: Setting up the Redis Index
First, we need to define the schema for our cache in Redis. We'll store the original query text, the response, and the vector embedding. We'll use an HNSW (Hierarchical Navigable Small World) index for the vector field, which provides excellent performance for low-latency Approximate Nearest Neighbor (ANN) search.
Connect to your Redis instance via redis-cli and run this command:
FT.CREATE semantic_cache_idx ON HASH PREFIX 1 cache:query: SCHEMA query_text TEXT query_embedding VECTOR HNSW 6 TYPE FLOAT32 DIM 384 DISTANCE_METRIC COSINE response_text TEXT
Let's break down this command:
* FT.CREATE semantic_cache_idx: Creates a new search index named semantic_cache_idx.
* ON HASH PREFIX 1 cache:query:: Specifies that we are indexing HASH data structures in Redis whose keys start with cache:query:.
* SCHEMA ...: Defines the fields to be indexed.
* query_embedding VECTOR HNSW 6 ...: This is the crucial part. It defines query_embedding as a vector field.
* HNSW 6: Uses the HNSW algorithm. The 6 is a configuration parameter; we're using default values here for simplicity. In production, you'd tune M and EF_CONSTRUCTION.
* TYPE FLOAT32 DIM 384: Specifies the data type and dimensionality, which must match our all-MiniLM-L6-v2 model.
* DISTANCE_METRIC COSINE: We'll use cosine similarity to measure the distance between vectors. This is standard for normalized sentence embeddings.
Step 2: Building the Caching Service
Now, let's write the Python code. We'll create a SemanticCache class to encapsulate the logic and a FastAPI endpoint to expose it.
import os
import uuid
import time
import numpy as np
import redis
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from sentence_transformers import SentenceTransformer
# --- Configuration ---
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
CACHE_INDEX_NAME = "semantic_cache_idx"
CACHE_SIMILARITY_THRESHOLD = 0.98 # High threshold for high confidence
CACHE_VECTOR_DIMENSION = 384
CACHE_TTL_SECONDS = 3600 # 1 hour
# --- Models ---
class QueryRequest(BaseModel):
query: str
class QueryResponse(BaseModel):
source: str
response: str
latency_ms: float
# --- Mock RAG Pipeline ---
# In a real application, this would be a complex function call.
async def execute_full_rag_pipeline(query: str) -> str:
print(f"\033[91mCACHE MISS: Executing full RAG pipeline for query: '{query}'\033[0m")
import asyncio
# Simulate network latency and LLM generation time
await asyncio.sleep(2.5)
return f"This is a freshly generated response for the query: '{query}'"
# --- Semantic Cache Service ---
class SemanticCache:
def __init__(self, redis_host, redis_port, index_name):
self.client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.index_name = index_name
self.model = SentenceTransformer('all-MiniLM-L6-v2')
print("SemanticCache initialized.")
def get_embedding(self, text: str) -> np.ndarray:
return self.model.encode(text, convert_to_tensor=False)
def search_cache(self, query_embedding: np.ndarray):
query_vector = query_embedding.astype(np.float32).tobytes()
# K=1 because we only care about the single most similar cached entry
# We use a dialect of 2 to prefer HNSW over brute force
q = (
redis.ft.query.Query(f"*=>[KNN 1 @query_embedding $query_vec AS vector_score]")
.sort_by("vector_score")
.return_fields("response_text", "vector_score")
.dialect(2)
)
params = {"query_vec": query_vector}
try:
results = self.client.ft(self.index_name).search(q, query_params=params)
except redis.exceptions.ResponseError as e:
print(f"Error searching Redis index. It might not exist: {e}")
# This can happen if the index wasn't created. We'll treat it as a cache miss.
return None
if results.docs:
doc = results.docs[0]
similarity = 1 - float(doc.vector_score) # Convert cosine distance to similarity
print(f"\033[94mCACHE CHECK: Found similar query with similarity: {similarity:.4f}\033[0m")
if similarity > CACHE_SIMILARITY_THRESHOLD:
return doc.response_text
return None
def add_to_cache(self, query: str, response: str, query_embedding: np.ndarray):
key = f"cache:query:{uuid.uuid4()}"
self.client.hset(key, mapping={
"query_text": query,
"query_embedding": query_embedding.astype(np.float32).tobytes(),
"response_text": response
})
self.client.expire(key, CACHE_TTL_SECONDS)
print(f"\033[92mCACHE ADD: Added new entry to cache for query: '{query}'\033[0m")
# --- FastAPI Application ---
app = FastAPI()
cache = SemanticCache(REDIS_HOST, REDIS_PORT, CACHE_INDEX_NAME)
@app.post("/query", response_model=QueryResponse)
async def handle_query(request: QueryRequest):
start_time = time.time()
# 1. Generate query embedding for cache lookup
query_embedding = cache.get_embedding(request.query)
# 2. Search the cache
cached_response = cache.search_cache(query_embedding)
if cached_response:
end_time = time.time()
return QueryResponse(
source="semantic_cache",
response=cached_response,
latency_ms=(end_time - start_time) * 1000
)
# 3. Cache miss: execute full pipeline
response_text = await execute_full_rag_pipeline(request.query)
# 4. Add the new result to the cache asynchronously
# Note: In a real app, you might do this in a background task
cache.add_to_cache(request.query, response_text, query_embedding)
end_time = time.time()
return QueryResponse(
source="rag_pipeline",
response=response_text,
latency_ms=(end_time - start_time) * 1000
)
# To run: uvicorn your_script_name:app --reload
Testing the Implementation:
Run the app and use a tool like curl to test it.
First Request (Cache Miss):
curl -X POST "http://127.0.0.1:8000/query" -H "Content-Type: application/json" -d '{"query": "How much does the Pro Plan cost per month?"}'
Expected Output:
{
"source": "rag_pipeline",
"response": "This is a freshly generated response for the query: 'How much does the Pro Plan cost per month?'",
"latency_ms": 2580.123
}
In your server logs, you'll see the "CACHE MISS" and "CACHE ADD" messages.
Second, Semantically Similar Request (Cache Hit):
curl -X POST "http://127.0.0.1:8000/query" -H "Content-Type: application/json" -d '{"query": "What is the monthly price for the professional plan?"}'
Expected Output:
{
"source": "semantic_cache",
"response": "This is a freshly generated response for the query: 'How much does the Pro Plan cost per month?'",
"latency_ms": 35.456
}
Here, the latency_ms is dramatically lower. The server logs will show a "CACHE CHECK" message with a high similarity score, confirming a hit.
Performance Benchmarks and Cost Analysis
The performance gains are not just theoretical. Here’s a typical comparison for a production system:
| Metric | Cache Miss (Full RAG Pipeline) | Cache Hit (Semantic Cache) | Improvement Factor |
|---|---|---|---|
| P95 Latency | 4500 ms | 45 ms | 100x |
| Embedding Cost | $0.0001 (Ada-002) | $0 (Local Model) | Infinite |
| LLM Cost (GPT-4) | ~$0.04 (4k context, 1k output) | $0 | Infinite |
| Compute | High (LLM Inference) | Low (Embedding + Redis) | ~50-200x |
With even a modest 30% cache hit rate, you can reduce your operational costs by nearly 30% and significantly lower your average response latency.
Advanced Edge Cases and Production Patterns
A simple implementation is a good start, but production systems require handling complex edge cases.
1. The Hard Problem: Cache Invalidation
What happens if the source document for a cached answer is updated? The cache now holds stale, incorrect information. This is the most challenging aspect of any caching system.
Strategy: Event-Driven Invalidation
This is the most robust solution. It requires coupling your caching system with your data ingestion pipeline.
# New schema with a 'source_docs' field
FT.CREATE ... SCHEMA ... source_docs TAG
When adding to the cache, you'd store a comma-separated list of IDs: HSET cache:query:xyz ... source_docs "doc_123,doc_456"
/invalidate-cache. # In your FastAPI app
class InvalidationRequest(BaseModel):
document_id: str
@app.post("/invalidate-cache")
async def invalidate(request: InvalidationRequest):
# Use RediSearch to find all cache entries tagged with this document ID
# The query syntax for a TAG field is @field:{value}
query = redis.ft.query.Query(f"@source_docs:{{{request.document_id}}}")
results = cache.client.ft(CACHE_INDEX_NAME).search(query)
deleted_count = 0
if results.docs:
keys_to_delete = [doc.id for doc in results.docs]
if keys_to_delete:
cache.client.delete(*keys_to_delete)
deleted_count = len(keys_to_delete)
print(f"Invalidated {deleted_count} cache entries for document_id: {request.document_id}")
return {"status": "success", "invalidated_count": deleted_count}
This creates a highly responsive caching system that actively purges stale data.
2. Threshold Tuning and A/B Testing
The CACHE_SIMILARITY_THRESHOLD is a magic number. Setting it is a trade-off:
* Too high (e.g., 0.995): Low cache hit rate. You miss many opportunities to serve from the cache.
* Too low (e.g., 0.90): High cache hit rate, but you risk returning answers to subtly different questions, which erodes user trust.
In production, this value should be determined empirically. Implement a shadow-mode A/B testing framework:
- Set a production threshold (e.g., 0.98).
- When a query comes in and the similarity is between a lower bound (e.g., 0.95) and the production threshold, serve from the full RAG pipeline.
- Manually or with an evaluation LLM, compare these logged pairs to determine if the cached response would have been acceptable. This data allows you to confidently adjust the threshold.
3. Negative Caching
If your RAG system frequently receives queries about topics outside its knowledge base, it will repeatedly perform expensive searches only to return "I don't have information about that." This is a prime candidate for caching.
Modify your add_to_cache logic to store these negative results. This prevents the system from wasting resources on repeatedly searching for information it knows doesn't exist.
Conclusion: Moving Beyond Naive RAG
While foundational RAG is powerful, it's not a production-ready solution without significant performance and cost optimization. A semantic cache is not a mere enhancement; it is a critical architectural component for any RAG system intended for interactive, real-world use.
By implementing a low-latency vector cache with a fast embedding model, you can transform the user experience from sluggish to instantaneous for a significant portion of user queries. The real challenge for senior engineers lies not in the initial implementation, but in building robust, production-grade systems around it, particularly in managing the cache lifecycle through sophisticated invalidation strategies and continuous performance tuning. This approach is what separates proof-of-concept RAG demos from scalable, cost-effective, and truly useful AI products.