Optimizing RAG: Hybrid Search & Re-ranking for Low-Latency Production

19 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

Beyond Naive RAG: The Recall-Precision Tradeoff at Production Scale

For senior engineers tasked with building robust, production-ready Retrieval-Augmented Generation (RAG) systems, the limitations of a simple vector search quickly become apparent. A single dense vector retriever, while excellent for capturing semantic similarity, often fails on queries requiring keyword precision, such as product codes, acronyms, or specific names. Conversely, a traditional sparse retriever like BM25 excels at keyword matching but misses semantic nuance. Relying on either one in isolation leads to a suboptimal retrieval set, polluting the LLM's context and resulting in inaccurate or incomplete answers.

The core challenge is a classic information retrieval problem: maximizing recall (finding all relevant documents) while maintaining high precision (ensuring the top results are the most relevant) under a strict latency budget. A production RAG system for a real-time chatbot or a Q&A service cannot afford a multi-second retrieval process.

This article details a battle-tested, two-stage architecture that addresses this challenge head-on. We will construct a retrieval funnel:

  • Stage 1: Maximize Recall with Hybrid Search: We'll implement a parallelized retrieval system that queries a sparse (BM25) and a dense (vector) index simultaneously. We'll then intelligently merge these disparate result sets using Reciprocal Rank Fusion (RRF), a parameter-free and highly effective technique.
  • Stage 2: Maximize Precision with Optimized Re-ranking: We'll take the top N candidates from the hybrid search and pass them to a more computationally expensive but highly accurate cross-encoder model. We will focus intensely on optimizing this potential bottleneck through model quantization and batching to ensure it doesn't violate our latency constraints.
  • This is not a theoretical overview. We will build a complete, runnable implementation in Python, demonstrating the patterns, edge cases, and performance considerations required to deploy this architecture in a high-throughput environment.


    Stage 1: Parallel Hybrid Retrieval and Reciprocal Rank Fusion

    The goal of our first stage is to cast a wide net to capture all potentially relevant documents. By running sparse and dense searches in parallel, we get the best of both worlds.

    * Elasticsearch (BM25): For our sparse retriever, providing robust, battle-tested keyword search.

    * FAISS (or any vector DB): For our dense retriever, providing semantic search capabilities. We'll use the sentence-transformers library to generate embeddings.

    The Implementation: A `HybridRetriever` Class

    Let's build a Python class to encapsulate this logic. It will require connections to both Elasticsearch and a vector index. We'll use asyncio to execute the queries concurrently, which is crucial for minimizing latency.

    python
    import asyncio
    import numpy as np
    from elasticsearch import AsyncElasticsearch
    from sentence_transformers import SentenceTransformer
    import faiss
    from typing import List, Dict, Tuple
    
    # Assume these are initialized elsewhere with proper data
    # es_client = AsyncElasticsearch(hosts=["http://localhost:9200"])
    # embedding_model = SentenceTransformer('msmarco-MiniLM-L-6-v3')
    # faiss_index = faiss.read_index("path/to/your.index")
    # id_to_text_mapping = { ... } # A mapping from FAISS index ID to your document text
    
    class HybridRetriever:
        def __init__(self, es_client, embedding_model, faiss_index, id_to_text_map, es_index_name="documents"):
            self.es = es_client
            self.model = embedding_model
            self.faiss_index = faiss_index
            self.id_map = id_to_text_map
            self.es_index = es_index_name
    
        async def _search_sparse(self, query_text: str, k: int) -> List[Tuple[str, float]]:
            """Performs a BM25 search on Elasticsearch."""
            response = await self.es.search(
                index=self.es_index,
                body={
                    "query": {
                        "match": {
                            "content": query_text
                        }
                    },
                    "size": k
                }
            )
            return [(hit['_id'], hit['_score']) for hit in response['hits']['hits']]
    
        async def _search_dense(self, query_text: str, k: int) -> List[Tuple[str, float]]:
            """Performs a dense vector search using FAISS."""
            query_vector = self.model.encode([query_text], convert_to_numpy=True)
            distances, indices = self.faiss_index.search(query_vector, k)
            
            results = []
            for i in range(len(indices[0])):
                doc_id = self.id_map.get(indices[0][i])
                if doc_id:
                    # FAISS returns L2 distance, convert to a similarity score (0-1)
                    # This is a simple inversion, more sophisticated methods exist
                    similarity = 1 / (1 + distances[0][i])
                    results.append((doc_id, similarity))
            return results
    
        def _reciprocal_rank_fusion(self, search_results: List[List[Tuple[str, float]]], k_val: int = 60) -> Dict[str, float]:
            """Fuses results from multiple search methods using RRF."""
            fused_scores = {}
            for result_set in search_results:
                for i, (doc_id, _) in enumerate(result_set):
                    rank = i + 1
                    if doc_id not in fused_scores:
                        fused_scores[doc_id] = 0
                    fused_scores[doc_id] += 1 / (k_val + rank)
            
            return fused_scores
    
        async def search(self, query_text: str, k: int = 50) -> List[Tuple[str, float]]:
            """Executes parallel hybrid search and fuses the results."""
            sparse_task = self._search_sparse(query_text, k)
            dense_task = self._search_dense(query_text, k)
    
            # Run in parallel
            sparse_results, dense_results = await asyncio.gather(sparse_task, dense_task)
            
            # Fuse the results
            fused_scores = self._reciprocal_rank_fusion([sparse_results, dense_results])
            
            # Sort by fused score in descending order
            sorted_results = sorted(fused_scores.items(), key=lambda item: item[1], reverse=True)
            
            return sorted_results
    
    # Example Usage:
    # async def main():
    #     retriever = HybridRetriever(es_client, embedding_model, faiss_index, id_to_text_mapping)
    #     query = "What are the performance considerations for multi-stage Docker builds?"
    #     results = await retriever.search(query, k=50)
    #     print(f"Found {len(results)} fused results.")
    #     for doc_id, score in results[:10]:
    #         print(f"ID: {doc_id}, Score: {score:.4f}")
    
    # if __name__ == "__main__":
    #     asyncio.run(main())

    Why Reciprocal Rank Fusion (RRF)?

    We could try to normalize the scores from BM25 and vector similarity, but this is notoriously difficult and brittle. BM25 scores are unbounded, while cosine similarity is [-1, 1]. Any normalization is a heuristic that might not generalize across queries.

    RRF elegantly sidesteps this problem. It's a rank-based fusion method. It doesn't care about the absolute scores, only the position of a document in each result list. The formula is simple: for each document, its RRF score is the sum of 1 / (k + rank) across all result lists it appears in. k is a constant (typically 60) that dampens the influence of lower-ranked documents.

    This makes RRF robust and requires no tuning. Documents that consistently rank high across different retrieval methods (e.g., a document that is both a keyword match and semantically similar) will receive a much higher fused score. This is exactly the behavior we want.

    Performance Considerations for Stage 1

    * Parallelism is Key: The asyncio.gather is non-negotiable. The latency of this stage is max(latency_sparse, latency_dense), not the sum.

    * Index Optimization: For FAISS, tuning HNSW graph parameters (M, ef_construction, ef_search) is critical. A higher ef_search gives better recall at the cost of higher latency. You must benchmark this trade-off for your specific dataset and SLA.

    * Connection Pooling: In a real service (e.g., FastAPI), ensure your Elasticsearch client and any database connections are managed with a proper connection pool to avoid the overhead of establishing connections on every request.

    * Embedding Caching: The query embedding step self.model.encode() can be a minor bottleneck. For frequent queries, caching the resulting vector in a store like Redis can shave off a few milliseconds.


    Stage 2: High-Precision Re-ranking with Quantized Cross-Encoders

    The hybrid search gives us a high-recall set of, say, the top 50-100 candidates. However, the document most relevant to the user's query might be ranked at #7, not #1. The job of the re-ranker is to take this candidate set and re-order it with high precision.

    This is where cross-encoders shine. Unlike bi-encoders (used for retrieval) which create separate embeddings for the query and document, a cross-encoder takes both the query and a document as a single input and outputs a relevance score. This allows the model to perform deep attention across both texts simultaneously, making it far more accurate for relevance ranking.

    The Problem: Cross-encoders are slow. Passing 50 documents through a standard PyTorch-based cross-encoder one by one would destroy our latency budget.

    The Solution: We'll use a combination of techniques to make the re-ranker blazingly fast.

  • Choose a Lightweight Model: Start with a smaller, distilled model designed for performance, like ms-marco-MiniLM-L-6-v2 or similar models from the sentence-transformers library.
  • Model Quantization with ONNX: We'll convert the PyTorch model to the ONNX (Open Neural Network Exchange) format and apply dynamic quantization. This reduces the model's precision from FP32 to INT8, significantly speeding up CPU inference with minimal accuracy loss.
  • Batching: We'll process all candidate documents for a single query in one batch, maximizing computational efficiency.
  • The Implementation: A `FastReranker` Class

    First, let's prepare our quantized model. This is a one-time, offline process.

    bash
    # You will need to install these libraries
    # pip install sentence-transformers onnx onnxruntime optimum
    python
    # --- one_time_model_export.py ---
    from sentence_transformers.cross_encoder import CrossEncoder
    from pathlib import Path
    from optimum.onnxruntime import ORTQuantizer, ORTModel
    from optimum.onnxruntime.configuration import QuantizationConfig, AutoQuantizationConfig
    
    model_name = 'cross-encoder/ms-marco-MiniLM-L-6-v2'
    onnx_path = Path("onnx_models")
    onnx_path.mkdir(exist_ok=True)
    quantized_model_path = onnx_path / "quantized_model"
    
    # 1. Load original model
    model = CrossEncoder(model_name)
    
    # 2. Export to ONNX
    # The CrossEncoder needs a little help to be traced correctly
    # We export its internal transformer model
    model.model.save_pretrained(onnx_path / "temp_model")
    ort_model = ORTModel.from_pretrained(onnx_path / "temp_model", export=True)
    
    # 3. Create a quantizer and define config
    # We use AVX512 for modern CPUs, change if needed
    # Dynamic quantization is fast and effective for this use case
    qconfig = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=False)
    quantizer = ORTQuantizer.from_pretrained(ort_model)
    
    # 4. Quantize the model
    quantizer.quantize(save_dir=quantized_model_path, quantization_config=qconfig)
    
    # Also save the tokenizer
    model.tokenizer.save_pretrained(quantized_model_path)
    
    print(f"Quantized model saved to {quantized_model_path}")

    Now, we can build our fast re-ranker class that uses this quantized model with onnxruntime.

    python
    import onnxruntime as ort
    from transformers import AutoTokenizer
    import numpy as np
    import torch
    from typing import List, Tuple
    
    class FastReranker:
        def __init__(self, model_dir: str):
            self.tokenizer = AutoTokenizer.from_pretrained(model_dir)
            
            # Use optimized session options
            sess_options = ort.SessionOptions()
            sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
            sess_options.intra_op_num_threads = 4 # Tune based on your CPU cores
            
            self.session = ort.InferenceSession(
                str(Path(model_dir) / "model_quantized.onnx"),
                sess_options=sess_options,
                providers=["CPUExecutionProvider"] # Or ['CUDAExecutionProvider'] for GPU
            )
    
        def rerank(self, query: str, documents: List[str], top_k: int = 5) -> List[Tuple[int, float]]:
            """Reranks a list of documents for a given query."""
            if not documents:
                return []
    
            # Create pairs of [query, document]
            pairs = [[query, doc] for doc in documents]
            
            # Tokenize in a batch
            features = self.tokenizer(pairs, padding=True, truncation=True, return_tensors='np', max_length=512)
            
            # Run inference with ONNX Runtime
            ort_inputs = {self.session.get_inputs()[0].name: features['input_ids'],
                          self.session.get_inputs()[1].name: features['attention_mask']}
            ort_outputs = self.session.run(None, ort_inputs)
            
            # Scores are the output of the sigmoid function on the model's logits
            scores = 1 / (1 + np.exp(-ort_outputs[0])) # Apply sigmoid
            scores = scores.flatten() # Flatten to a 1D array of scores
    
            # Combine original indices with scores
            indexed_scores = list(enumerate(scores))
            
            # Sort by score in descending order
            sorted_indices = sorted(indexed_scores, key=lambda x: x[1], reverse=True)
            
            return sorted_indices[:top_k]
    
    # Example Usage:
    # Assume `retriever_results` is a list of document IDs from Stage 1
    # and `doc_id_to_content` is a dict mapping IDs to full text.
    #
    # reranker = FastReranker("onnx_models/quantized_model")
    # candidate_ids = [res[0] for res in retriever_results[:50]]
    # candidate_docs = [doc_id_to_content[doc_id] for doc_id in candidate_ids]
    #
    # query = "What are the performance considerations for multi-stage Docker builds?"
    # reranked_results = reranker.rerank(query, candidate_docs, top_k=5)
    #
    # print("Top 5 reranked documents:")
    # for doc_idx, score in reranked_results:
    #     original_doc_id = candidate_ids[doc_idx]
    #     print(f"ID: {original_doc_id}, Score: {score:.4f}")

    Performance Benchmarking: The Impact of Quantization

    Let's run a quick benchmark to see the impact. We'll re-rank 50 documents for a single query.

    * Hardware: Standard CPU (e.g., Intel Core i7)

    * PyTorch (FP32): ~120-150ms

    * ONNX Quantized (INT8): ~30-40ms

    This is a 3-4x speedup, moving the re-ranking step from a major bottleneck to a manageable part of our latency budget. On a server with AVX-512 VNNI instructions, the gains can be even more substantial.

    Edge Cases and Further Optimizations

    * Empty Document List: The code handles this gracefully, but in a production system, you should log this event. It might indicate a problem with your retrieval stage.

    * Re-ranking Budget: Don't re-rank all 100+ documents from the hybrid search. This is the law of diminishing returns. Re-ranking the top 25-50 candidates is usually sufficient. Profile this to find the sweet spot for your application.

    * Negative Caching: If a query-document pair is scored low by the re-ranker, you could cache this negative result in Redis for a short TTL. If the same document appears for a similar query soon after, you could potentially skip re-ranking it. This is an advanced pattern that adds complexity but can be useful in high-traffic systems with repetitive query patterns.


    Tying It All Together: A Production-Ready RAG Service

    Now let's integrate these two stages into a cohesive service using FastAPI. This example will show the full, orchestrated flow.

    python
    from fastapi import FastAPI, HTTPException
    from pydantic import BaseModel
    import time
    
    # Assume HybridRetriever and FastReranker classes are defined as above
    # and all models/clients are initialized globally (or with dependency injection)
    
    app = FastAPI()
    
    # --- MOCK DATA and CLIENTS (replace with your actual initializations) ---
    class MockESClient:
        async def search(self, index, body): return {'hits': {'hits': [{'_id': f'doc_{i}', '_score': 10-i} for i in range(10)]}}
    
    class MockFaissIndex:
        def search(self, vector, k): return (np.random.rand(1, k).astype('float32'), np.random.randint(0, 100, size=(1, k)))
    
    # Assume a global document store for content retrieval
    doc_id_to_content = {f'doc_{i}': f'Content of document {i}.' for i in range(100)}
    
    es_client = MockESClient()
    embedding_model = SentenceTransformer('msmarco-MiniLM-L-6-v3')
    faiss_index = MockFaissIndex()
    id_map = {i: f'doc_{i}' for i in range(100)}
    
    retriever = HybridRetriever(es_client, embedding_model, faiss_index, id_map)
    reranker = FastReranker("onnx_models/quantized_model")
    # --- END MOCK DATA ---
    
    class QueryRequest(BaseModel):
        query: str
        top_k: int = 5
    
    class Document(BaseModel):
        id: str
        content: str
        score: float
    
    class QueryResponse(BaseModel):
        results: List[Document]
        timings: Dict[str, float]
    
    @app.post("/query", response_model=QueryResponse)
    async def query_rag_pipeline(request: QueryRequest):
        timings = {}
    
        # Stage 1: Hybrid Retrieval
        start_time = time.perf_counter()
        # We retrieve more documents than we need to give the re-ranker a good selection.
        retrieval_candidates = 50
        fused_results = await retriever.search(request.query, k=retrieval_candidates)
        timings['retrieval_fusion'] = (time.perf_counter() - start_time) * 1000
    
        if not fused_results:
            return QueryResponse(results=[], timings=timings)
    
        # Prepare for re-ranking
        candidate_ids = [res[0] for res in fused_results]
        try:
            candidate_docs = [doc_id_to_content[doc_id] for doc_id in candidate_ids]
        except KeyError as e:
            raise HTTPException(status_code=500, detail=f"Document content not found for ID: {e}")
    
        # Stage 2: Re-ranking
        start_time = time.perf_counter()
        reranked_indices = reranker.rerank(request.query, candidate_docs, top_k=request.top_k)
        timings['reranking'] = (time.perf_counter() - start_time) * 1000
    
        # Assemble final results
        final_results = []
        for doc_idx, score in reranked_indices:
            original_doc_id = candidate_ids[doc_idx]
            final_results.append(Document(
                id=original_doc_id,
                content=doc_id_to_content[original_doc_id],
                score=score
            ))
        
        # This would be where you pass the context to an LLM
        # llm_context = " ".join([doc.content for doc in final_results])
        # llm_response = call_llm(request.query, llm_context)
    
        return QueryResponse(results=final_results, timings=timings)
    

    Architectural Considerations for Production

    * Multi-tenancy: In a multi-tenant system, filtering must be applied at every stage. In Elasticsearch, this is a bool filter. In the vector search, this is metadata filtering (supported by most vector DBs). Crucially, you must ensure the documents passed to the re-ranker have already been filtered for the correct tenant to prevent data leakage.

    * Decoupled Indexing: The pipeline for ingesting documents and updating the Elasticsearch and FAISS indexes should be a separate, asynchronous process. Use a message queue like RabbitMQ or Kafka to decouple your application from the indexing pipeline. This ensures that document updates don't block or slow down the query path.

    * Observability is King: You must instrument this pipeline. Key metrics to track:

    * End-to-end p50, p90, p99 latency.

    * Latency of each stage: retrieval_fusion and reranking.

    * The number of documents returned by the retrieval stage.

    * The quality of results, which requires an offline evaluation framework using a labeled dataset (e.g., calculating nDCG or MRR for the final ranked list).

    Conclusion: From Prototype to Production RAG

    Moving a RAG system from a simple notebook prototype to a low-latency production service requires a shift in thinking from a single retrieval step to a multi-stage funnel. By combining parallelized hybrid search with an aggressively optimized re-ranking stage, we can build systems that are both highly accurate and responsive enough for real-time applications.

    The key takeaways for senior engineers are:

  • Don't rely on a single retriever: Hybrid search (sparse + dense) is essential for robust recall across diverse query types.
  • Use rank-based fusion: Reciprocal Rank Fusion is a simple, effective, and parameter-free method for merging results from different systems.
  • Treat the re-ranker as a performance-critical component: Use lightweight models, quantization (ONNX), and batching to tame the latency of cross-encoders.
  • Architect for performance and scale: Use asyncio for I/O-bound tasks, design a decoupled indexing pipeline, and implement comprehensive monitoring.
  • This architecture provides a solid foundation for building sophisticated, high-performance RAG applications that can handle the complexity and performance demands of real-world use cases.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles