Advanced RAG: Hybrid Search & Reranking for Production Systems
Beyond Naive Vector Search: The Case for a Multi-Stage RAG Pipeline
As senior engineers, we've moved past the initial excitement of Retrieval-Augmented Generation (RAG). We know the basic pattern: embed a user's query, perform a vector similarity search against a corpus of chunked documents, stuff the top-k results into a Large Language Model (LLM) context, and generate an answer. While effective for simple semantic queries, this naive approach quickly breaks down in production environments, leading to a frustrating user experience characterized by plausible but incorrect answers, an inability to find specific information, and a general lack of reliability.
The core limitation of a vector-only retrieval system is its reliance on semantic similarity, which often fails to capture lexical or keyword-based relevance. A user searching for a specific product SKU like XG-48-T2B
or a project codename like "Project Titan" Q3 Financials
is not performing a semantic search; they are performing a keyword search. A dense vector representation might place documents containing these terms close to each other, but it offers no guarantee they will be ranked highest, especially if the surrounding text is semantically different from the query.
This is where production-grade RAG diverges from tutorials. We must evolve from a single-step retrieval process to a sophisticated, multi-stage pipeline designed to maximize both recall (finding all potentially relevant documents) and precision (ranking the most relevant document first).
This post details the architecture and implementation of such a pipeline. We will build a system that:
This retrieve-fuse-rerank pattern is the blueprint for building RAG systems that are not just demos, but reliable tools for enterprise knowledge retrieval.
The Failure Mode of Vector-Only Search
Let's establish a concrete baseline to demonstrate the problem. We'll use a small corpus of documents about fictional internal company projects. Notice the mix of semantic descriptions and specific, keyword-like identifiers.
Corpus:
# documents.py
documents = [
{
"id": "doc1",
"text": "Project Apollo achieved a 15% increase in user engagement by overhauling the user interface. The final report is available under file ID A-2023-Q4."
},
{
"id": "doc2",
"text": "The marketing team's 'Starlight' campaign resulted in a 5% market share growth. Key metrics are detailed in the Q3 review."
},
{
"id": "doc3",
"text": "Project Titan's Q3 financials show a net profit of $1.2M. The project is on track. The primary identifier for this project is T-FIN-2023-Q3."
},
{
"id": "doc4",
"text": "A critical security vulnerability was patched in the authentication service. The ticket number is SEC-991. All systems are now secure."
},
{
"id": "doc5",
"text": "Our cloud infrastructure costs were analyzed in the 'CloudSpend Q3' report. We identified savings opportunities related to underutilized instances."
}
]
Now, let's set up a basic vector search pipeline using sentence-transformers
for embeddings and a simple in-memory faiss
index.
Baseline Vector Search Implementation:
# baseline_vector_search.py
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from documents import documents
class VectorSearch:
def __init__(self, documents, model_name='all-MiniLM-L6-v2'):
self.documents = documents
self.model = SentenceTransformer(model_name)
self.index = self._build_index()
def _build_index(self):
print("Building vector index...")
embeddings = self.model.encode([doc['text'] for doc in self.documents], convert_to_tensor=False)
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(embeddings)
return index
def search(self, query, k=3):
query_embedding = self.model.encode([query])
distances, indices = self.index.search(query_embedding, k)
return [{'id': self.documents[i]['id'], 'text': self.documents[i]['text'], 'score': 1 - d}
for i, d in zip(indices[0], distances[0])]
# --- Demonstration ---
if __name__ == '__main__':
vector_search_engine = VectorSearch(documents)
# Query 1: A semantic query
semantic_query = "What are the financial results of our projects?"
semantic_results = vector_search_engine.search(semantic_query)
print(f"\n--- Results for: '{semantic_query}' ---")
for res in semantic_results:
print(f"ID: {res['id']}, Score: {res['score']:.4f}, Text: {res['text'][:80]}...")
# Query 2: A keyword-specific query
keyword_query = "Project Titan Q3 financials"
keyword_results = vector_search_engine.search(keyword_query)
print(f"\n--- Results for: '{keyword_query}' ---")
for res in keyword_results:
print(f"ID: {res['id']}, Score: {res['score']:.4f}, Text: {res['text'][:80]}...")
# Query 3: A specific identifier query
id_query = "T-FIN-2023-Q3"
id_results = vector_search_engine.search(id_query)
print(f"\n--- Results for: '{id_query}' ---")
for res in id_results:
print(f"ID: {res['id']}, Score: {res['score']:.4f}, Text: {res['text'][:80]}...")
Expected Output:
--- Results for: 'What are the financial results of our projects?' ---
ID: doc3, Score: 0.6587, Text: Project Titan's Q3 financials show a net profit of $1.2M. The project is on tr...
ID: doc5, Score: 0.5912, Text: Our cloud infrastructure costs were analyzed in the 'CloudSpend Q3' report. We ...
ID: doc2, Score: 0.5701, Text: The marketing team's 'Starlight' campaign resulted in a 5% market share growt...
--- Results for: 'Project Titan Q3 financials' ---
ID: doc3, Score: 0.9999, Text: Project Titan's Q3 financials show a net profit of $1.2M. The project is on tr...
ID: doc5, Score: 0.6011, Text: Our cloud infrastructure costs were analyzed in the 'CloudSpend Q3' report. We ...
ID: doc2, Score: 0.5899, Text: The marketing team's 'Starlight' campaign resulted in a 5% market share growt...
--- Results for: 'T-FIN-2023-Q3' ---
ID: doc1, Score: 0.4503, Text: Project Apollo achieved a 15% increase in user engagement by overhauling the u...
ID: doc3, Score: 0.4489, Text: Project Titan's Q3 financials show a net profit of $1.2M. The project is on tr...
ID: doc4, Score: 0.3921, Text: A critical security vulnerability was patched in the authentication service. Th...
The results are revealing:
* Semantic Query: Works perfectly. The query about "financial results" correctly retrieves doc3
(Titan financials) and doc5
(CloudSpend report).
* Keyword Query: Works well because the keywords are prominent in the document.
* Identifier Query: This is a catastrophic failure. The query for the specific ID T-FIN-2023-Q3
fails to rank doc3
as the top result. The embedding model has no specific understanding of this token string and instead finds spurious semantic relationships with other documents. The correct document is present, but it's not ranked first. In a production RAG system retrieving 10 documents for an LLM, this document might be buried too low to be useful, or missed entirely.
This is the problem we must solve. The system needs to be excellent at both semantic and lexical retrieval.
Stage 1 & 2: Hybrid Search and Reciprocal Rank Fusion
To address the shortcomings of vector-only search, we introduce a parallel retrieval process: a classic, battle-tested keyword search engine. For this, we'll use an in-memory implementation of BM25 (Okapi BM25), the algorithm powering systems like Elasticsearch and OpenSearch.
Our architecture will now look like this:
User Query -> [Vector Search] & [Keyword Search] -> [Fusion] -> Candidate List
Implementing the Keyword Search Component
We'll use the rank_bm25
library for a simple but effective BM25 implementation.
# keyword_search.py
from rank_bm25 import BM25Okapi
from documents import documents
class KeywordSearch:
def __init__(self, documents):
self.documents = documents
self.doc_map = {doc['id']: doc['text'] for doc in documents}
tokenized_corpus = [doc['text'].split(" ") for doc in documents]
self.bm25 = BM25Okapi(tokenized_corpus)
def search(self, query, k=3):
tokenized_query = query.split(" ")
doc_scores = self.bm25.get_scores(tokenized_query)
# Get top k indices
top_n_indices = np.argsort(doc_scores)[::-1][:k]
return [{'id': self.documents[i]['id'], 'text': self.documents[i]['text'], 'score': doc_scores[i]}
for i in top_n_indices if doc_scores[i] > 0]
# --- Demonstration ---
if __name__ == '__main__':
keyword_search_engine = KeywordSearch(documents)
id_query = "T-FIN-2023-Q3"
results = keyword_search_engine.search(id_query)
print(f"\n--- Results for: '{id_query}' ---")
for res in results:
print(f"ID: {res['id']}, Score: {res['score']:.4f}, Text: {res['text'][:80]}...")
Running this gives us the exact result we need for the identifier query:
--- Results for: 'T-FIN-2023-Q3' ---
ID: doc3, Score: 0.9373, Text: Project Titan's Q3 financials show a net profit of $1.2M. The project is on tr...
BM25 excels here because it's based on exact token matches. The token T-FIN-2023-Q3
is unique to doc3
, giving it a very high score.
Fusing the Results with Reciprocal Rank Fusion (RRF)
Now we have two ranked lists of results, each with different scoring systems and strengths. How do we combine them into a single, superior list? A naive approach might be to normalize scores and add them, but this is brittle and requires constant tuning. A far more robust method is Reciprocal Rank Fusion (RRF).
RRF is elegant in its simplicity. It disregards the actual scores and focuses only on the rank of each document in the result lists. For each document, its RRF score is calculated as the sum of the reciprocals of its ranks across the different lists.
RRF_Score(doc) = Σ (1 / (k + rank_i))
Where rank_i
is the rank of the document in result list i
, and k
is a constant to mitigate the impact of high ranks (a common value is k=60
). This formula heavily favors documents that appear in the top positions of any list, making it resilient to the varying score scales of different retrieval systems.
Let's implement the fusion logic.
# fusion.py
from collections import defaultdict
def reciprocal_rank_fusion(search_results_lists, k=60):
"""
Performs Reciprocal Rank Fusion on a list of search results lists.
:param search_results_lists: A list of lists, where each inner list contains dicts with 'id' and 'score'.
:param k: The constant used in the RRF formula.
:return: A single, fused, and sorted list of document IDs and their RRF scores.
"""
fused_scores = defaultdict(float)
# Each search_results_list is a list of dicts from one retriever
for results in search_results_lists:
for rank, doc in enumerate(results):
doc_id = doc['id']
fused_scores[doc_id] += 1 / (k + rank + 1) # rank is 0-indexed
# Sort by the fused score in descending order
reranked_results = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
return reranked_results
# --- Putting it all together ---
if __name__ == '__main__':
from baseline_vector_search import VectorSearch
from keyword_search import KeywordSearch
from documents import documents
# Initialize engines
vector_search_engine = VectorSearch(documents)
keyword_search_engine = KeywordSearch(documents)
# Run a query where vector search fails
query = "T-FIN-2023-Q3"
print(f"\n--- Fusing results for query: '{query}' ---")
vector_results = vector_search_engine.search(query, k=5)
keyword_results = keyword_search_engine.search(query, k=5)
print("\nVector Search Results (Top 5):")
for r in vector_results: print(f" {r['id']}")
print("\nKeyword Search Results (Top 5):")
for r in keyword_results: print(f" {r['id']}")
fused_results = reciprocal_rank_fusion([vector_results, keyword_results])
print("\nFused and Reranked Results (RRF):")
for doc_id, score in fused_results:
print(f" ID: {doc_id}, Score: {score:.6f}")
Output of the Fusion Logic:
--- Fusing results for query: 'T-FIN-2023-Q3' ---
Vector Search Results (Top 5):
doc1
doc3
doc4
doc2
doc5
Keyword Search Results (Top 5):
doc3
Fused and Reranked Results (RRF):
ID: doc3, Score: 0.032512
ID: doc1, Score: 0.016393
ID: doc4, Score: 0.015873
ID: doc2, Score: 0.015625
ID: doc5, Score: 0.015385
Success! Even though vector search ranked doc3
second, its #1 position in the keyword search results gave it a much higher RRF score, propelling it to the top of the final fused list. We have successfully combined the strengths of both systems to improve recall for a difficult query.
Stage 3: Precision Enhancement with Cross-Encoder Reranking
Hybrid search with RRF is a massive improvement for recall. We are now much more likely to have the correct document within our top N
candidates (e.g., top 50). However, the ranking within that list is still based on the relatively simple signals from our retrievers. The final step to achieve state-of-the-art precision is to add a reranking layer.
Bi-Encoders vs. Cross-Encoders: A Critical Distinction
The SentenceTransformer
model we used for retrieval is a bi-encoder. It creates numerical representations (embeddings) for the query and documents independently. The search process is fast because it's just a mathematical distance calculation in vector space. However, this independence means the model never sees the query and document together, limiting its ability to capture fine-grained relevance.
A cross-encoder, on the other hand, takes both the query and a document as a single input [CLS] query [SEP] document [SEP]
and outputs a single score from 0 to 1 representing their relevance. This allows the model to perform full self-attention across both the query and document tokens, giving it a much deeper understanding of their relationship.
The trade-off is speed. A cross-encoder is orders of magnitude slower than a bi-encoder search. We cannot use it to score our entire corpus. But it is perfectly suited for reranking a small set of promising candidates returned by our hybrid search stage.
Implementation of the Reranking Layer
We will use a model from the sentence-transformers
library that has been specifically trained for this task, such as ms-marco-MiniLM-L-6-v2
or the more powerful BAAI/bge-reranker-large
.
# reranker.py
from sentence_transformers.cross_encoder import CrossEncoder
class Reranker:
def __init__(self, model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'):
# For higher accuracy, consider 'BAAI/bge-reranker-large'
self.model = CrossEncoder(model_name)
def rerank(self, query, documents):
"""
Reranks a list of documents based on a query.
:param query: The user query string.
:param documents: A list of documents, each a dict with 'id' and 'text'.
:return: A sorted list of documents with an added 'rerank_score'.
"""
# The model expects a list of [query, passage] pairs.
pairs = [[query, doc['text']] for doc in documents]
scores = self.model.predict(pairs)
# Combine scores with original documents
for i in range(len(documents)):
documents[i]['rerank_score'] = scores[i]
# Sort documents by the new rerank score in descending order
return sorted(documents, key=lambda x: x['rerank_score'], reverse=True)
The Complete Production Pipeline
Now, let's assemble all three stages into a single, cohesive pipeline. This class will encapsulate the entire retrieve-fuse-rerank logic.
# full_pipeline.py
import time
from documents import documents
from baseline_vector_search import VectorSearch
from keyword_search import KeywordSearch
from fusion import reciprocal_rank_fusion
from reranker import Reranker
class AdvancedRAGPipeline:
def __init__(self, documents):
print("Initializing Advanced RAG Pipeline...")
self.documents = documents
self.doc_map = {doc['id']: doc for doc in documents}
# Stage 1: Retrievers
self.vector_search = VectorSearch(documents)
self.keyword_search = KeywordSearch(documents)
# Stage 3: Reranker
self.reranker = Reranker()
print("Pipeline initialized.")
def query(self, query, retrieve_k=10, rerank_k=3):
print(f"\nExecuting query: '{query}'")
# 1. Retrieve
start_time = time.time()
vector_results = self.vector_search.search(query, k=retrieve_k)
keyword_results = self.keyword_search.search(query, k=retrieve_k)
retrieval_time = time.time() - start_time
print(f" [1] Retrieval took {retrieval_time:.4f}s")
# 2. Fuse
start_time = time.time()
fused_results = reciprocal_rank_fusion([vector_results, keyword_results])
# Get the full document objects for the top candidates
candidate_ids = [doc_id for doc_id, score in fused_results[:retrieve_k]]
candidate_docs = [self.doc_map[doc_id] for doc_id in candidate_ids]
fusion_time = time.time() - start_time
print(f" [2] Fusion took {fusion_time:.4f}s")
# 3. Rerank
start_time = time.time()
reranked_results = self.reranker.rerank(query, candidate_docs)
rerank_time = time.time() - start_time
print(f" [3] Reranking took {rerank_time:.4f}s")
return reranked_results[:rerank_k]
# --- Demonstration ---
if __name__ == '__main__':
pipeline = AdvancedRAGPipeline(documents)
# Test with a nuanced, semantic query that could be ambiguous
query = "Tell me about security and money from Titan"
final_results = pipeline.query(query, retrieve_k=5, rerank_k=3)
print("\n--- Final Reranked Results ---")
for res in final_results:
print(f"ID: {res['id']}, Rerank Score: {res['rerank_score']:.4f}, Text: {res['text'][:100]}...")
Output of the Full Pipeline:
Initializing Advanced RAG Pipeline...
Building vector index...
Pipeline initialized.
Executing query: 'Tell me about security and money from Titan'
[1] Retrieval took 0.0210s
[2] Fusion took 0.0001s
[3] Reranking took 0.0450s
--- Final Reranked Results ---
ID: doc3, Rerank Score: 0.8912, Text: Project Titan's Q3 financials show a net profit of $1.2M. The project is on track. The primary identi...
ID: doc4, Rerank Score: 0.0015, Text: A critical security vulnerability was patched in the authentication service. The ticket number is SEC-...
ID: doc1, Rerank Score: -2.4531, Text: Project Apollo achieved a 15% increase in user engagement by overhauling the user interface. The fi...
This result is fantastic. The query is intentionally ambiguous, containing keywords related to doc3
("Titan", "money") and doc4
("security"). Our hybrid retrieval phase would have pulled up both. However, the cross-encoder reranker correctly identified that the query's main intent is related to doc3
by a massive margin, assigning it a very high score while heavily penalizing the others. This is the level of precision required for a production system.
Performance, Edge Cases, and Production Considerations
Building this pipeline is only half the battle. Operating it efficiently and robustly in production requires addressing several critical engineering challenges.
Latency Breakdown and Optimization
The reranking step is the primary latency bottleneck. While our local example is fast, reranking 50 documents with a large model on a CPU can take hundreds of milliseconds.
* Asynchronous Retrieval: The vector and keyword searches are independent. In a production service (e.g., using FastAPI or aiohttp), they should be executed concurrently using asyncio.gather
to reduce retrieval latency to max(t_vector, t_keyword)
instead of sum(t_vector, t_keyword)
.
* Hardware Acceleration: Cross-encoder models benefit massively from GPUs. For a high-throughput service, deploying the reranker model on a GPU-enabled instance (e.g., using a dedicated Triton Inference Server or a simple FastAPI service with a GPU-backed PyTorch) is essential.
* Model Quantization/Distillation: If GPU costs are prohibitive, consider using quantized models (e.g., via ONNX runtime) or smaller, distilled versions of the reranker. This is a direct trade-off between latency/cost and accuracy.
* Smart k
Selection: The number of documents you pass to the reranker (retrieve_k
in our code) is a critical lever. Reranking the top 20 is much faster than the top 100. Analyze your retrieval performance offline. If your hybrid search consistently places the correct answer in the top 25, there's no need to rerank 100 documents.
Edge Case: Handling Long Documents
Our examples use short documents, but real-world data involves long PDFs, Word documents, or transcripts. The standard approach is to chunk these documents before embedding.
The Problem: Chunking can sever context. If a user query is best answered by a paragraph at the end of chunk #3 and a sentence at the beginning of chunk #4, retrieving only one chunk will lead to an incomplete answer. Furthermore, the final LLM context is much richer if it receives the full document, not just an isolated chunk.
The Advanced Solution:
parent_document_id
with each chunk.N
chunks, extract their parent_document_id
s, and de-duplicate them.Here is a conceptual implementation of the re-expansion logic:
# pseudo-code for long document handling
def process_long_docs(reranked_chunks, num_docs_for_llm=3):
# Assumes each chunk dict has a 'parent_document_id' key
parent_doc_ids_ordered = []
seen_parent_ids = set()
for chunk in reranked_chunks:
parent_id = chunk['parent_document_id']
if parent_id not in seen_parent_ids:
seen_parent_ids.add(parent_id)
parent_doc_ids_ordered.append(parent_id)
if len(parent_doc_ids_ordered) >= num_docs_for_llm:
break
# Now, fetch the full text of these parent documents from a database
# full_docs = database.fetch_docs_by_ids(parent_doc_ids_ordered)
# return full_docs
return parent_doc_ids_ordered # returning IDs for demonstration
Conclusion: RAG as a Systems Engineering Discipline
Moving a RAG system from a prototype to a production-ready service is a significant leap in complexity. It requires a shift in thinking from simply finding semantically similar text to engineering a robust, multi-stage information retrieval system.
The retrieve-fuse-rerank architecture presented here provides a powerful and adaptable blueprint. By combining the lexical strength of keyword search with the semantic power of vector search, we maximize our chances of recalling all relevant information. By adding a sophisticated cross-encoder reranker, we ensure that the most pertinent documents are placed at the very top, giving the final LLM the highest quality context possible.
This approach directly translates to a better user experience: fewer incorrect or "hallucinated" answers, the ability to find specific information reliably, and increased trust in the system. For senior engineers tasked with building AI-powered knowledge systems, mastering these advanced RAG patterns is no longer optional—it's a fundamental requirement for success.