Production RAG: Hybrid Search with Cross-Encoder Re-ranking

18 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

Beyond Naive RAG: The Case for a Multi-Stage Retrieval Architecture

For senior engineers building applications on Large Language Models (LLMs), Retrieval-Augmented Generation (RAG) has become a foundational pattern. However, the ubiquitous "introductory" RAG pipeline—embedding a query and performing a simple vector similarity search—exhibits critical failures in production environments. These systems often struggle with queries containing specific keywords, error codes, product SKUs, or acronyms that dense embedding models, trained on semantic similarity, fail to represent accurately. The result is a context that is thematically related but factually incorrect, leading to confident hallucinations.

A production-ready RAG system cannot rely on a single retrieval method. It requires a sophisticated, multi-stage retrieval architecture designed for both recall (finding all potentially relevant documents) and precision (ranking the most relevant documents at the top). This article details the implementation of such an architecture, focusing on two key advancements:

  • Hybrid Search: Combining the semantic power of dense vector search with the keyword precision of sparse retrieval (like BM25) using a fusion algorithm like Reciprocal Rank Fusion (RRF).
  • Cross-Encoder Re-ranking: Adding a final, high-precision ranking stage that re-evaluates the top candidates from the hybrid search, ensuring the most relevant context is fed to the LLM.
  • We will move directly into implementation, assuming a working knowledge of basic RAG concepts, vector databases, and Python. We will build a system that addresses the shortcomings of naive vector search and provides the robustness required for enterprise-grade AI applications.


    Part 1: The Failure Point of Pure Vector Search

    Before architecting the solution, it's crucial to understand the precise failure mechanism. Dense vector embeddings, generated by models like Sentence-Transformers or OpenAI's Ada, map text to a high-dimensional space where semantic proximity corresponds to geometric proximity (e.g., low cosine distance).

    This is powerful for queries like "How do I improve database performance?" It will correctly retrieve documents about indexing, query optimization, and connection pooling.

    However, consider these production queries:

    "Troubleshoot error 0x80070005 in Windows Update.*"

    "What is the inventory level for SKU XG-55-2A-PROD?*"

    "What does the Kubelet do in our EKS cluster?*"

    An embedding model may not have a precise vector representation for the alphanumeric identifier 0x80070005. It might retrieve documents about general Windows errors, but miss the specific knowledge base article that mentions this exact code. The semantic meaning is lost because the salience is in the lexical token itself. This is where traditional keyword search excels.

    Our goal is to create a system that gets the best of both worlds: the semantic understanding of dense vectors and the lexical precision of sparse vectors.


    Part 2: Architecting a Hybrid Search Retriever

    Hybrid search combines results from at least two different retrieval systems. Our implementation will use a vector database for dense search and an Elasticsearch index for sparse, BM25-based search.

    Component 1: The Dense Retriever (Vector Search)

    This is the standard component in most RAG systems. We'll use the sentence-transformers library for embeddings and a generic vector database client interface. For this example, we'll simulate the client, but in production, this would be pinecone-client, weaviate-client, or similar.

    python
    import numpy as np
    from sentence_transformers import SentenceTransformer
    
    # --- Simulated Vector DB Client ---
    class VectorDBClient:
        def __init__(self, dimension: int):
            self.dimension = dimension
            self.index = {}
            self.vectors = None
    
        def upsert(self, documents: list[dict]):
            doc_ids = [doc['id'] for doc in documents]
            vectors = np.array([doc['vector'] for doc in documents])
            for i, doc_id in enumerate(doc_ids):
                self.index[doc_id] = i
            self.vectors = vectors
    
        def query(self, vector: np.ndarray, top_k: int) -> list[dict]:
            if self.vectors is None:
                return []
            # Cosine similarity calculation
            sims = np.dot(self.vectors, vector) / (np.linalg.norm(self.vectors, axis=1) * np.linalg.norm(vector))
            # Get top_k indices
            top_k_indices = np.argsort(sims)[-top_k:][::-1]
            
            results = []
            for i in top_k_indices:
                doc_id = [k for k, v in self.index.items() if v == i][0]
                results.append({'id': doc_id, 'score': float(sims[i])})
            return results
    
    # --- Dense Retriever Implementation ---
    class DenseRetriever:
        def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
            self.model = SentenceTransformer(model_name)
            self.dimension = self.model.get_sentence_embedding_dimension()
            self.db_client = VectorDBClient(dimension=self.dimension)
    
        def build_index(self, documents: dict[str, str]):
            doc_ids = list(documents.keys())
            doc_texts = list(documents.values())
            
            print("Building dense index...")
            vectors = self.model.encode(doc_texts, convert_to_numpy=True, show_progress_bar=True)
            
            db_payload = [{'id': doc_id, 'vector': vector} for doc_id, vector in zip(doc_ids, vectors)]
            self.db_client.upsert(db_payload)
            print("Dense index built.")
    
        def retrieve(self, query: str, top_k: int) -> list[dict]:
            query_vector = self.model.encode(query, convert_to_numpy=True)
            return self.db_client.query(query_vector, top_k)
    

    Component 2: The Sparse Retriever (BM25)

    For sparse retrieval, we'll use Elasticsearch, the industry standard for full-text search, which uses BM25 as its default scoring algorithm. We will use the official Python client.

    python
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    
    # --- Sparse Retriever Implementation ---
    class SparseRetriever:
        def __init__(self, index_name: str = "sparse_docs_index"):
            self.es_client = Elasticsearch([{'host': 'localhost', 'port': 9200, 'scheme': 'http'}])
            self.index_name = index_name
    
        def build_index(self, documents: dict[str, str]):
            if self.es_client.indices.exists(index=self.index_name):
                print(f"Deleting existing index: {self.index_name}")
                self.es_client.indices.delete(index=self.index_name)
            
            print(f"Creating index: {self.index_name}")
            self.es_client.indices.create(index=self.index_name)
    
            actions = [
                {
                    "_index": self.index_name,
                    "_id": doc_id,
                    "_source": {"text": text}
                }
                for doc_id, text in documents.items()
            ]
            
            print("Bulk indexing documents for sparse search...")
            bulk(self.es_client, actions)
            print("Sparse index built.")
    
        def retrieve(self, query: str, top_k: int) -> list[dict]:
            response = self.es_client.search(
                index=self.index_name,
                query={
                    "match": {
                        "text": query
                    }
                },
                size=top_k
            )
            return [{'id': hit['_id'], 'score': hit['_score']} for hit in response['hits']['hits']]
    

    Note: You need a running Elasticsearch instance for the code above to work.

    Component 3: Fusing the Results with Reciprocal Rank Fusion (RRF)

    Now we have two lists of ranked documents, each with a different, incompatible scoring system (cosine similarity vs. BM25 score). We cannot simply add or average these scores. A robust, parameter-free method for combining ranked lists is Reciprocal Rank Fusion (RRF).

    RRF calculates a new score for each document based on its rank in each retrieval list. The formula is:

    RRF_Score(d) = Σ (1 / (k + rank_i(d)))

    Where rank_i(d) is the rank of document d in result list i, and k is a constant (commonly set to 60) that diminishes the impact of lower-ranked items.

    Here's the implementation:

    python
    def reciprocal_rank_fusion(retrieval_results: list[list[dict]], k: int = 60) -> dict[str, float]:
        fused_scores = {}
        
        for result_list in retrieval_results:
            for rank, doc in enumerate(result_list):
                doc_id = doc['id']
                if doc_id not in fused_scores:
                    fused_scores[doc_id] = 0
                # Add the RRF score
                fused_scores[doc_id] += 1 / (k + rank + 1)
                
        return fused_scores
    
    class HybridRetriever:
        def __init__(self, dense_retriever: DenseRetriever, sparse_retriever: SparseRetriever):
            self.dense_retriever = dense_retriever
            self.sparse_retriever = sparse_retriever
    
        def retrieve(self, query: str, top_k: int) -> list[dict]:
            dense_results = self.dense_retriever.retrieve(query, top_k)
            sparse_results = self.sparse_retriever.retrieve(query, top_k)
            
            fused_scores = reciprocal_rank_fusion([dense_results, sparse_results])
            
            # Sort documents by their fused score in descending order
            sorted_docs = sorted(fused_scores.items(), key=lambda item: item[1], reverse=True)
            
            return [{'id': doc_id, 'score': score} for doc_id, score in sorted_docs[:top_k]]
    

    This HybridRetriever now provides a single, unified retrieval method that leverages both semantic and lexical signals, dramatically improving recall over a wide range of query types.


    Part 3: The 'Last Mile' Problem: Precision with Cross-Encoder Re-ranking

    Hybrid search is excellent for recall, but it can still surface documents that are only tangentially related. The top N results might contain the answer, but not necessarily at rank #1. Since LLMs have a limited context window and are sensitive to the order of information, ensuring the most relevant document is first is critical. This is the 'last mile' problem of retrieval.

    We solve this with a re-ranker. While our initial retrieval used bi-encoders (which create document and query embeddings independently), a re-ranker uses a cross-encoder.

    * Bi-Encoder: Computes embeddings separately. Fast, suitable for searching over millions of documents. score = f(embed(query), embed(doc))

    Cross-Encoder: Takes the query and a document together* as input to produce a relevance score. Much slower, but far more accurate as it can model the interactions between query and document tokens directly. score = g(query, doc)

    The pattern is to use the fast bi-encoder/BM25 hybrid search to retrieve a candidate set (e.g., top 50 documents) and then use the slow, accurate cross-encoder to re-rank only this small set.

    python
    from sentence_transformers.cross_encoder import CrossEncoder
    
    class ReRanker:
        def __init__(self, model_name: str = 'cross-encoder/ms-marco-MiniLM-L-6-v2'):
            # This model is small and fast, but very effective.
            self.model = CrossEncoder(model_name)
    
        def rerank(self, query: str, documents: dict[str, str], top_k: int) -> list[dict]:
            # The model expects pairs of [query, document_text]
            doc_ids = list(documents.keys())
            doc_texts = list(documents.values())
            
            pairs = [[query, doc_text] for doc_text in doc_texts]
            
            print(f"Re-ranking {len(pairs)} documents...")
            scores = self.model.predict(pairs, show_progress_bar=False)
            
            # Combine doc_ids with scores and sort
            reranked_results = [{'id': doc_id, 'score': score} for doc_id, score in zip(doc_ids, scores)]
            reranked_results.sort(key=lambda x: x['score'], reverse=True)
            
            return reranked_results[:top_k]
    

    Now we can assemble the final, end-to-end pipeline.


    Part 4: Production Implementation and Performance Considerations

    Let's integrate all components into a single, production-ready class.

    python
    class ProductionRAGPipeline:
        def __init__(self, documents: dict[str, str]):
            self.documents = documents
            print("Initializing retrievers...")
            self.dense_retriever = DenseRetriever()
            self.sparse_retriever = SparseRetriever()
            self.hybrid_retriever = HybridRetriever(self.dense_retriever, self.sparse_retriever)
            self.reranker = ReRanker()
    
            print("Building indices...")
            self.dense_retriever.build_index(self.documents)
            self.sparse_retriever.build_index(self.documents)
            print("Pipeline ready.")
    
        def retrieve_and_rerank(self, query: str, hybrid_top_k: int = 50, rerank_top_k: int = 5):
            print(f"\nExecuting query: '{query}'")
            # 1. Hybrid Retrieval (for high recall)
            hybrid_results = self.hybrid_retriever.retrieve(query, top_k=hybrid_top_k)
            hybrid_retrieved_ids = [doc['id'] for doc in hybrid_results]
            
            # Prepare documents for the re-ranker
            docs_for_reranking = {doc_id: self.documents[doc_id] for doc_id in hybrid_retrieved_ids}
            
            # 2. Re-ranking (for high precision)
            final_results = self.reranker.rerank(query, docs_for_reranking, top_k=rerank_top_k)
            
            return final_results
    
    # --- Example Usage ---
    
    # Sample documents
    documents = {
        "doc1": "The Kubelet is the primary node agent that runs on each node. It registers the node with the apiserver.",
        "doc2": "Our EKS cluster is running on Kubernetes version 1.28. We use Karpenter for node autoscaling.",
        "doc3": "A common Windows Update error is 0x80070005, which indicates an access denied issue, often related to permissions.",
        "doc4": "To fix error 0x80070005, you should run the Windows Update Troubleshooter or check file system permissions.",
        "doc5": "The product with SKU XG-55-2A-PROD is a high-performance GPU for machine learning workloads.",
        "doc6": "General database optimization includes adding indexes to frequently queried columns."
    }
    
    pipeline = ProductionRAGPipeline(documents)
    
    # Query 1: A semantic query
    semantic_query = "How do I make my database faster?"
    results1 = pipeline.retrieve_and_rerank(semantic_query)
    print("Results for semantic query:", [res['id'] for res in results1])
    
    # Query 2: A keyword-specific query
    keyword_query = "fix error 0x80070005"
    results2 = pipeline.retrieve_and_rerank(keyword_query)
    print("Results for keyword query:", [res['id'] for res in results2])
    
    # Query 3: An acronym/jargon query
    jargon_query = "what is the kubelet in EKS?"
    results3 = pipeline.retrieve_and_rerank(jargon_query)
    print("Results for jargon query:", [res['id'] for res in results3])
    

    Latency Analysis and Optimization

    The primary performance bottleneck in this architecture is the cross-encoder re-ranking step. While the parallel queries to the vector DB and Elasticsearch are fast (typically <100ms), the re-ranker performs a full model inference for each of the hybrid_top_k documents.

    Latency Breakdown (Hypothetical):

    * Dense Query (Vector DB): ~50ms

    * Sparse Query (Elasticsearch): ~30ms

    * RRF Fusion: <5ms

    * Re-ranking (50 docs on CPU): 400-800ms

    * Total Latency: ~500-900ms

    This latency might be unacceptable for real-time applications. Here are advanced optimization strategies:

  • Model Quantization: The cross-encoder model can be converted to a quantized format like ONNX or INT8. This can reduce model size and accelerate inference speed by 2-4x with a negligible drop in accuracy. Tools like Hugging Face's optimum library can facilitate this.
  • python
        # Example using optimum for ONNX quantization
        # from optimum.onnxruntime import ORTQuantizer
        # from optimum.onnxruntime.configuration import AutoQuantizationConfig
        # ... (code to load and quantize the model)
  • Dedicated Inference Infrastructure: Do not run the cross-encoder on the same CPU as your application server. Deploy it to a dedicated inference endpoint with GPU support. Options include:
  • * AWS SageMaker Endpoints: Provides a managed environment for deploying models with auto-scaling.

    * Self-hosted Triton Inference Server on a GPU instance (e.g., EC2 g5): Offers high throughput and batching capabilities.

    * Serverless GPU providers (e.g., Banana.dev, Replicate): Easy to set up but can have cold start issues.

  • Intelligent Caching: Implement a two-level cache (e.g., using Redis).
  • * Cache Level 1: Cache the final re-ranked document IDs for identical queries. CACHE_KEY = HASH(query).

    * Cache Level 2: Cache the re-ranking scores for specific (query, document_id) pairs. This is useful if different queries retrieve overlapping document sets, avoiding re-computation for documents that have already been scored against that query.

    Edge Case Handling

    A production system must be resilient.

    * Retriever Failure: What if Elasticsearch is down? The HybridRetriever should be wrapped in a try...except block to fall back to dense-only results if the sparse retriever fails, and vice-versa. Log the failure prominently.

    * Empty Results: If one retriever returns no results, RRF gracefully handles this. If both return empty, the pipeline should return an empty list immediately, short-circuiting the re-ranker.

    * Re-ranker Timeout: The re-ranking step should have a strict timeout. If it exceeds the timeout, the system should fall back to returning the un-ranked results from the hybrid search stage. This ensures the system remains responsive, albeit with potentially lower precision.


    Part 5: Evaluating the Advanced Pipeline

    To justify the added complexity, you must measure its impact. Simple accuracy is insufficient for ranking systems. Use rank-aware metrics:

    Mean Reciprocal Rank (MRR): Measures the average reciprocal of the rank of the first* correct answer. Excellent for question-answering tasks where finding one good document is key.

    * Normalized Discounted Cumulative Gain (nDCG): Evaluates the quality of the entire ranked list, giving higher weight to correct documents ranked higher. Ideal for general-purpose search.

    Hypothetical Benchmark on a Technical Q&A Dataset:

    Pipeline ConfigurationMRR@10nDCG@10
    Naive RAG (Vector Search Only)0.650.72
    Hybrid Search (No Re-ranking)0.780.81
    Advanced RAG (Hybrid + Re-ranker)0.890.92

    These metrics provide quantitative proof that the multi-stage architecture delivers substantial improvements in retrieval quality, which directly translates to more accurate and reliable LLM responses.


    Conclusion

    Moving from a simplistic RAG prototype to a production-grade system requires a fundamental shift in retrieval architecture. By embracing a multi-stage process of hybrid retrieval followed by cross-encoder re-ranking, we build systems that are resilient to the diverse nature of user queries. This architecture effectively combines the semantic recall of dense vectors with the lexical precision of sparse search, while the re-ranking stage ensures that the most contextually relevant information is prioritized for the LLM.

    While this approach introduces complexity and latency challenges, the optimization techniques discussed—quantization, dedicated hardware, and caching—provide a clear path to mitigating them. For any serious RAG application, the investment in this advanced retrieval pipeline is not just an optimization; it is a prerequisite for achieving the accuracy, reliability, and user trust required in a production environment.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles