Optimizing RAG: Hybrid Search and Re-ranking in Production
The Production RAG Relevance Problem
A standard Retrieval-Augmented Generation (RAG) pipeline is conceptually simple: embed a user query, perform a vector similarity search against a document store, retrieve the top-K documents, and inject them as context into a Large Language Model (LLM) prompt. While effective for simple semantic queries, this approach reveals critical flaws in production environments:
GKE-1234, CVE-2023-4863). The semantic embedding may generalize these terms, failing to retrieve documents where they are explicitly mentioned.K=20) to ensure the correct context is found increases the payload to the LLM, which in turn increases inference time and cost. A smaller K is faster but risks missing the critical context.A production-ready RAG system must solve for precision. We need to retrieve a small, highly-relevant set of documents. The solution is a multi-stage retrieval architecture that combines the strengths of different search paradigms: Hybrid Search for recall, followed by Re-ranking for precision.
This article details the implementation of this advanced pipeline.
Our Scenario: A Technical Documentation Q&A System
Imagine we're building a Q&A bot for a large software project. The documentation contains conceptual guides, API references, and troubleshooting pages with specific error codes. We'll use this scenario for our code examples.
We assume an indexed corpus of documents in two systems:
* Elasticsearch (or OpenSearch): For sparse, keyword-based retrieval using the BM25 algorithm.
* FAISS Index (or any Vector DB): For dense, semantic vector retrieval.
Stage 1: Implementing Hybrid Search with Reciprocal Rank Fusion (RRF)
Hybrid search combines results from at least two different retrieval methods—typically sparse (keyword) and dense (vector). The challenge lies in merging the two disparate sets of ranked results into a single, coherent list.
The Retrievers: Sparse vs. Dense
First, let's define our two independent retrieval functions. In a real application, these would be asynchronous clients querying our search services.
import asyncio
import numpy as np
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch
import faiss
# --- MOCK DATA AND MODELS (for a runnable example) ---
# 1. Mock Documents
docs = [
{"id": "doc1", "text": "The GKE-1234 error is related to networking configuration in Google Kubernetes Engine."},
{"id": "doc2", "text": "To optimize your cloud infrastructure, consider autoscaling your compute instances."},
{"id": "doc3", "text": "A common network policy misconfiguration can lead to the GKE-1234 failure."},
{"id": "doc4", "text": "Semantic search leverages deep learning models to understand query intent."},
{"id": "doc5", "text": "Our system, codenamed 'Vanguard', uses a novel approach to data processing."}
]
# 2. Mock Elasticsearch (BM25 Retriever)
es_client = Elasticsearch([{'host': 'localhost', 'port': 9200, 'scheme': 'http'}]) # Assumes ES is running
# For this example, we'll simulate the ES client's response.
def mock_es_search(query: str, k: int):
# A real implementation uses the BM25 algorithm in Elasticsearch.
# Here, we simulate by simple keyword matching.
results = []
for doc in docs:
if any(term.lower() in doc['text'].lower() for term in query.split()):
results.append({'_id': doc['id'], '_score': np.random.rand()}) # Mock score
results.sort(key=lambda x: x['_score'], reverse=True)
return {'hits': {'hits': results[:k]}}
# 3. Mock FAISS (Vector Retriever)
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
doc_embeddings = embedding_model.encode([doc['text'] for doc in docs])
index = faiss.IndexFlatL2(doc_embeddings.shape[1])
index.add(doc_embeddings)
id_to_doc = {i: doc['id'] for i, doc in enumerate(docs)}
# --- PRODUCTION-STYLE RETRIEVER FUNCTIONS ---
async def retrieve_sparse(query: str, k: int = 10) -> list[tuple[str, float]]:
"""Retrieves documents using a keyword-based search (BM25)."""
# In production, this would be an async HTTP call to Elasticsearch.
print(f"Executing sparse search for: '{query}'")
await asyncio.sleep(0.05) # Simulate network latency
response = mock_es_search(query, k)
return [(hit['_id'], hit['_score']) for hit in response['hits']['hits']]
async def retrieve_dense(query: str, k: int = 10) -> list[tuple[str, float]]:
"""Retrieves documents using a semantic vector search."""
# In production, this would query a vector database like Pinecone, Weaviate, or a self-hosted FAISS/ScaNN service.
print(f"Executing dense search for: '{query}'")
await asyncio.sleep(0.08) # Simulate model inference and search latency
query_embedding = embedding_model.encode([query])
distances, indices = index.search(query_embedding, k)
results = []
for i in range(len(indices[0])):
doc_index = indices[0][i]
score = 1.0 - distances[0][i] # Convert distance to similarity score
doc_id = id_to_doc[doc_index]
results.append((doc_id, score))
return results
Notice the use of asyncio. Running these I/O-bound retrieval tasks concurrently is the first and most crucial optimization for keeping the P99 latency of your RAG API endpoint in check.
The Fusion Problem: Why You Can't Just Add Scores
The scores from BM25 and vector similarity search are not directly comparable. BM25 scores can be unbounded, while cosine similarity is typically in the [-1, 1] or [0, 1] range. A naive weighted sum a bm25_score + b vector_score requires constant, painful tuning and is not robust across different query types.
Solution: Reciprocal Rank Fusion (RRF)
RRF is a simple, powerful, and parameter-free method for combining ranked lists. It disregards the absolute scores and only uses the rank of each document in the result lists.
The RRF score for a document d is calculated as:
RRF_score(d) = Σ (1 / (k + rank_i(d)))
Where:
* rank_i(d) is the rank of document d in result set i.
* k is a constant (typically set to 60) that dampens the influence of lower-ranked items.
Let's implement the fusion logic.
from collections import defaultdict
def reciprocal_rank_fusion(*results_lists: list[list[tuple[str, float]]], k: int = 60) -> list[tuple[str, float]]:
"""Performs Reciprocal Rank Fusion on multiple ranked lists of documents."""
rrf_scores = defaultdict(float)
# Each results_list is a list of (doc_id, score) tuples from a retriever
for results in results_lists:
for rank, (doc_id, _) in enumerate(results, 1):
rrf_scores[doc_id] += 1 / (k + rank)
# Sort documents by their combined RRF score in descending order
sorted_docs = sorted(rrf_scores.items(), key=lambda item: item[1], reverse=True)
return sorted_docs
async def hybrid_retrieval(query: str, sparse_k: int = 10, dense_k: int = 10):
"""Executes sparse and dense retrieval in parallel and fuses the results."""
sparse_results_task = retrieve_sparse(query, k=sparse_k)
dense_results_task = retrieve_dense(query, k=dense_k)
# Await both tasks to run concurrently
sparse_results, dense_results = await asyncio.gather(
sparse_results_task,
dense_results_task
)
fused_results = reciprocal_rank_fusion(sparse_results, dense_results)
return fused_results
# --- Example Usage ---
async def main():
# Query where keyword search is critical
query_keyword = "GKE-1234 error"
print(f"\n--- Running Hybrid Search for: '{query_keyword}' ---")
fused_results_keyword = await hybrid_retrieval(query_keyword)
print("Fused Results (Keyword Query):")
for doc_id, score in fused_results_keyword:
print(f" ID: {doc_id}, RRF Score: {score:.4f}")
# Query where semantic search is critical
query_semantic = "how to improve cloud performance"
print(f"\n--- Running Hybrid Search for: '{query_semantic}' ---")
fused_results_semantic = await hybrid_retrieval(query_semantic)
print("Fused Results (Semantic Query):")
for doc_id, score in fused_results_semantic:
print(f" ID: {doc_id}, RRF Score: {score:.4f}")
# To run the async main function:
# asyncio.run(main())
When you run this, you'll see that for the query "GKE-1234 error", doc1 and doc3 will have very high RRF scores because they rank highly in the sparse search results. For "how to improve cloud performance", doc2 will rank highly from the dense retriever, and RRF will ensure it surfaces at the top of the final list.
This hybrid approach significantly improves recall—our ability to retrieve all potentially relevant documents.
Stage 2: Precision via Cross-Encoder Re-ranking
Hybrid search gives us a better-ordered list of, say, 20-50 documents. This is still too many to pass to an LLM. We need to narrow this down to the top 3-5 most relevant documents. This is where a re-ranker comes in.
Bi-Encoders vs. Cross-Encoders: A Critical Distinction
Bi-Encoders: The SentenceTransformer we used for dense retrieval is a bi-encoder. It creates embeddings for the query and documents independently*. The comparison is done later using a cheap distance metric (like cosine similarity). This is fast and scalable, making it perfect for first-stage retrieval over millions of documents.
* Cross-Encoders: A cross-encoder takes both the query and a document as a single input (query, document) and passes them through a powerful Transformer model (like BERT). The output is a single score [0, 1] representing the relevance. This allows for deep, token-level interaction between the query and the document, making it far more accurate than a bi-encoder. However, it's computationally expensive and not feasible for searching over a large corpus.
This makes a cross-encoder the perfect tool for a re-ranking stage. We use the fast, scalable hybrid retriever to find the top 50 candidates, and then use the slow, accurate cross-encoder to re-rank only those 50.
Implementing the Re-ranking Stage
We'll use a highly optimized cross-encoder model from the sentence-transformers library.
from sentence_transformers.cross_encoder import CrossEncoder
# Initialize a lightweight, fast cross-encoder model
# Models like 'ms-marco-MiniLM-L-6-v2' are excellent for this task
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
# Let's create a reverse mapping from ID to text for content lookup
id_to_text = {doc['id']: doc['text'] for doc in docs}
def rerank_documents(query: str, retrieved_docs: list[tuple[str, float]], top_n: int = 5) -> list[tuple[str, float]]:
"""Re-ranks a list of retrieved documents using a cross-encoder model."""
# Prepare pairs of [query, document_text] for the model
doc_ids = [doc_id for doc_id, _ in retrieved_docs]
doc_texts = [id_to_text[doc_id] for doc_id in doc_ids]
pairs = [[query, doc_text] for doc_text in doc_texts]
# The cross-encoder model scores all pairs in a single batch for efficiency
print(f"Re-ranking {len(pairs)} documents...")
scores = cross_encoder.predict(pairs)
# Combine doc IDs with their new scores
reranked_results = list(zip(doc_ids, scores))
# Sort by the new cross-encoder score in descending order
reranked_results.sort(key=lambda x: x[1], reverse=True)
return reranked_results[:top_n]
# --- Putting It All Together: The Full Pipeline ---
async def advanced_rag_pipeline(query: str):
"""The complete, production-grade RAG pipeline."""
print(f"\n--- ADVANCED RAG PIPELINE FOR QUERY: '{query}' ---")
# 1. Hybrid Retrieval (for high recall)
# We retrieve more documents than we'll finally need, e.g., 50
fused_results = await hybrid_retrieval(query, sparse_k=25, dense_k=25)
print(f"Retrieved {len(fused_results)} documents after fusion.")
# 2. Re-ranking (for high precision)
# We re-rank the top 50 and select the best 3
reranked_docs = rerank_documents(query, fused_results, top_n=3)
print("Top 3 documents after re-ranking:")
for doc_id, score in reranked_docs:
print(f" ID: {doc_id}, Relevance Score: {score:.4f}")
# print(f" Text: {id_to_text[doc_id]}") # Uncomment for debugging
# 3. Augmentation and Generation (The final step)
# The context is now a small set of highly relevant documents
context = "\n\n".join([id_to_text[doc_id] for doc_id, _ in reranked_docs])
# This context is then passed to the LLM
# llm_prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
# final_answer = llm.generate(llm_prompt) # Psuedo-code for LLM call
print("\nFinal context passed to LLM is highly relevant and concise.")
return reranked_docs
# --- Example Usage of the Full Pipeline ---
# asyncio.run(advanced_rag_pipeline("What is the cause of the GKE-1234 issue?"))
# asyncio.run(advanced_rag_pipeline("Tell me about the Vanguard system"))
When running this full pipeline, the rerank_documents function will take the RRF-sorted list and apply a much higher level of scrutiny, ensuring that only the documents that truly answer the query make it into the final context. This solves the "lost in the middle" problem and reduces LLM inference costs.
Performance, Benchmarking, and Production Edge Cases
Deploying this system requires attention to performance and robustness.
Latency Breakdown and Optimization
The total latency is the sum of its parts. Let's analyze a hypothetical P99 latency budget:
* Hybrid Retrieval (Parallel): max(latency_sparse, latency_dense)
* Sparse Search (Elasticsearch): ~50-100ms
* Dense Search (Vector DB): ~80-150ms
* Parallel Latency: ~150ms
* RRF Fusion: <1ms (in-memory computation)
* Re-ranker Inference: This is the new bottleneck. For 50 documents on a CPU, a MiniLM-sized cross-encoder might take 200-400ms. On a GPU (e.g., NVIDIA T4), this can be reduced to 30-60ms.
* LLM Generation: Highly variable (e.g., 500ms - 2000ms+ for streamed response)
Total Latency (pre-LLM): ~350-550ms on CPU, ~180-210ms on GPU.
Optimization Strategies:
Edge Case: Handling Long Documents
Cross-encoders have a fixed input sequence length (e.g., 512 tokens). If your documents are longer, you cannot pass the entire text.
Problematic Solution: Truncate the document. You might cut off the most relevant passage.
Production-Grade Solution:
The second approach is often better as it provides the LLM with broader context while still being guided by the most relevant passage.
Edge Case: Tuning the `k` in RRF
While RRF is often called "parameter-free," the k constant (defaulting to 60) can be tuned.
* A lower k (e.g., 10) gives more weight to the top-ranked items. If you have very high confidence in your retrievers, this can improve precision.
* A higher k (e.g., 100) smooths out the scores and gives more consideration to lower-ranked items, potentially improving diversity.
In practice, k=60 is a robust default established by the original paper, but it's worth experimenting with during offline evaluation.
Building a Feedback Loop
To continuously improve the system, you need a feedback loop. When users interact with your RAG system, collect data:
* Which generated answers were helpful? (e.g., a thumbs up/down button)
* Which source documents were clicked on?
This data is invaluable. You can use it to create a dataset of (query, relevant_document, irrelevant_document) triplets. This dataset can then be used to fine-tune your cross-encoder re-ranker, teaching it your domain's specific definition of relevance. This is the final step in moving from a good RAG system to a great one.
Conclusion
Moving beyond naive vector search is non-negotiable for building a production RAG system that users can trust. By architecting a multi-stage pipeline that first optimizes for recall using parallelized hybrid search with RRF, and then ruthlessly prunes for precision using a fast cross-encoder re-ranker, you can solve the core relevance challenges.
This architecture directly addresses the keyword-search problem, mitigates the "lost in the middle" effect by creating a dense and relevant context, and provides a clear path for performance optimization via dedicated model serving and caching. While more complex than a basic RAG prototype, this investment in the retrieval pipeline is what separates a brittle demo from a robust, scalable, and accurate AI product.