Optimizing RAG: Hybrid Search & Cross-Encoder Re-ranking

22 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Fragility of Pure Vector Search in Production RAG

For engineers architecting and deploying Retrieval-Augmented Generation (RAG) systems, the initial proof-of-concept often relies on a straightforward vector search pipeline. We embed a corpus of documents into a high-dimensional space and, at query time, embed the user's question to find the k-nearest neighbors. This approach is powerful for capturing semantic similarity and is the cornerstone of modern RAG. However, in production environments with diverse and specific user queries, the limitations of this naive approach become a critical failure point.

The core issue is the semantic-lexical gap. Dense vector embeddings, generated by bi-encoder models like all-MiniLM-L6-v2, are trained to understand meaning and context. They excel at mapping "how to manage project budgets" to documents discussing "financial planning for software development". But they often fail spectacularly at lexical, or keyword-based, matching.

Consider these failure modes:

  • Identifier-based Queries: A user searching for a specific product SKU like "XG-500-A" or a specific error code "ERR_CONN_RESET". These identifiers may have low semantic weight or be treated as out-of-vocabulary tokens by the embedding model, causing them to be lost in the vector representation.
  • Acronyms and Jargon: Enterprise documents are rife with acronyms (GDPR, SOC2, QBR). While a fine-tuned model might understand common ones, a general-purpose model may not map "SOC2 compliance report" effectively to the source documents if the embedding doesn't capture that specific token's importance.
  • Proper Nouns and Names: A query for a specific person, "Dr. Evelyn Reed's research", might not be resolved correctly if the name is not semantically central to the document's overall meaning, even if it's mentioned frequently.
  • A production-grade RAG system cannot afford these inconsistencies. The solution is not to abandon vector search, but to augment it. We need a system that leverages the best of both worlds: the semantic power of dense vectors and the keyword precision of sparse retrieval algorithms like BM25. This article details the implementation of a sophisticated, multi-stage retrieval pipeline that achieves this balance through hybrid search and cross-encoder re-ranking.


    Stage 1: Implementing a Hybrid Search Retriever

    Hybrid search combines results from two distinct retrieval methods: a sparse retriever (lexical) and a dense retriever (semantic). Our sparse retriever of choice is Okapi BM25, a battle-tested algorithm based on term frequency-inverse document frequency (TF-IDF). Our dense retriever will be a standard vector search.

    Many modern vector databases (Pinecone, Weaviate) offer hybrid search as a built-in feature. However, to understand the mechanics, we will implement it using Elasticsearch, which has robust support for both BM25 and k-Nearest Neighbor (kNN) vector search. This gives us granular control over the entire process.

    Setting Up the Environment and Index

    First, ensure you have a running Elasticsearch instance (a local Docker container is perfect for this) and the necessary Python libraries.

    bash
    npm install -g @elastic/elastic-search-cli
    # In another terminal
    elastic-search --version 8.11.1
    
    pip install elasticsearch sentence-transformers torch

    Our first task is to create an Elasticsearch index that can store both the raw text for BM25 and the dense vector for kNN search.

    python
    # 1_setup_hybrid_index.py
    
    from elasticsearch import Elasticsearch
    from sentence_transformers import SentenceTransformer
    import json
    
    # --- Configuration ---
    INDEX_NAME = "hybrid_rag_index"
    EMBEDDING_MODEL = 'all-MiniLM-L6-v2' 
    
    # --- Connect to Elasticsearch ---
    try:
        es_client = Elasticsearch(
            "http://localhost:9200",
            # In a real application, use API keys or other auth methods
            # api_key="YOUR_API_KEY",
        )
        print("Connected to Elasticsearch successfully!")
    except ConnectionError as e:
        print(f"Connection to Elasticsearch failed: {e}")
        exit()
    
    # --- Load Sentence Transformer Model ---
    print(f"Loading embedding model: {EMBEDDING_MODEL}...")
    model = SentenceTransformer(EMBEDDING_MODEL)
    embedding_dim = model.get_sentence_embedding_dimension()
    print(f"Model loaded. Embedding dimension: {embedding_dim}")
    
    def create_index():
        """Creates an Elasticsearch index with mappings for text and dense vectors."""
        if es_client.indices.exists(index=INDEX_NAME):
            print(f"Index '{INDEX_NAME}' already exists. Deleting...")
            es_client.indices.delete(index=INDEX_NAME)
        
        index_mapping = {
            "properties": {
                "text_content": {
                    "type": "text",
                    "analyzer": "standard"
                },
                "document_vector": {
                    "type": "dense_vector",
                    "dims": embedding_dim,
                    "index": True,
                    "similarity": "cosine" # or 'l2_norm' for Euclidean distance
                }
            }
        }
        
        print(f"Creating index '{INDEX_NAME}'...")
        es_client.indices.create(
            index=INDEX_NAME, 
            mappings=index_mapping
        )
        print("Index created successfully.")
    
    def index_documents(documents):
        """Indexes a list of documents, creating embeddings for each."""
        operations = []
        doc_texts = [doc['text_content'] for doc in documents]
        
        print(f"Generating embeddings for {len(doc_texts)} documents...")
        embeddings = model.encode(doc_texts, show_progress_bar=True)
        
        for i, doc in enumerate(documents):
            operations.append({"index": {"_index": INDEX_NAME, "_id": doc['id']}})
            operations.append({
                "text_content": doc['text_content'],
                "document_vector": embeddings[i].tolist()
            })
        
        print("Performing bulk indexing...")
        response = es_client.bulk(index=INDEX_NAME, operations=operations)
        if response['errors']:
            print("Bulk indexing had errors.")
            # Basic error logging
            for item in response['items']:
                if 'error' in item['index']:
                    print(json.dumps(item['index']['error'], indent=2))
        else:
            print("Bulk indexing completed successfully.")
    
    if __name__ == "__main__":
        # Sample documents demonstrating potential failure points for pure vector search
        sample_docs = [
            {'id': 'doc1', 'text_content': 'The SOC2 Type II compliance report for Q3 2023 is now available on the internal portal.'},
            {'id': 'doc2', 'text_content': 'Firmware update XG-500-A addresses the critical vulnerability CVE-2023-12345.'},
            {'id': 'doc3', 'text_content': 'Our financial planning guide for software development projects outlines key budget management strategies.'},
            {'id': 'doc4', 'text_content': 'According to Dr. Evelyn Reed\'s latest research, quantum entanglement can be stabilized at room temperature.'},
            {'id': 'doc5', 'text_content': 'General Data Protection Regulation (GDPR) policies were updated last month.'}
        ]
        
        create_index()
        index_documents(sample_docs)
    

    This script sets up an index with two key fields: text_content for BM25 and document_vector for kNN search. It then populates the index with sample documents, generating the vector embeddings on the fly.

    Constructing the Hybrid Query

    Now we can perform a hybrid search. The query will have two parts: a match query for BM25 and a knn query for vector search. Elasticsearch can run these in parallel and return a combined set of results. Each result will have a score from its respective query type.

    python
    # 2_hybrid_search.py
    
    from elasticsearch import Elasticsearch
    from sentence_transformers import SentenceTransformer
    import json
    
    # --- Configuration (should match setup script) ---
    INDEX_NAME = "hybrid_rag_index"
    EMBEDDING_MODEL = 'all-MiniLM-L6-v2'
    
    # --- Initialize clients ---
    es_client = Elasticsearch("http://localhost:9200")
    model = SentenceTransformer(EMBEDDING_MODEL)
    
    def run_hybrid_search(query_text, k=5, bm25_boost=1.0, knn_boost=1.0):
        """Performs a hybrid search combining BM25 and kNN vector search."""
        print(f"\n--- Running Hybrid Search for query: '{query_text}' ---")
        
        # 1. Generate query vector
        query_vector = model.encode(query_text).tolist()
        
        # 2. Construct the BM25 (sparse) query
        sparse_query = {
            "match": {
                "text_content": {
                    "query": query_text
                }
            }
        }
        
        # 3. Construct the kNN (dense) query
        dense_query = {
            "field": "document_vector",
            "query_vector": query_vector,
            "k": k,
            "num_candidates": 10 # Increase for better accuracy, at a performance cost
        }
        
        # 4. Execute the search
        # Note: Elasticsearch combines scores, but we'll re-rank later.
        # For direct ES ranking, you can use rank/rrf features in newer versions.
        # Here we fetch results from both and fuse them manually for clarity.
    
        # Fetch BM25 results
        bm25_response = es_client.search(
            index=INDEX_NAME,
            query=sparse_query,
            size=k
        )
        bm25_results = {hit['_id']: hit['_score'] for hit in bm25_response['hits']['hits']}
        print(f"BM25 found {len(bm25_results)} results.")
    
        # Fetch kNN results
        knn_response = es_client.search(
            index=INDEX_NAME,
            knn=dense_query,
            size=k,
            _source=False # We only need IDs and scores
        )
        knn_results = {hit['_id']: hit['_score'] for hit in knn_response['hits']['hits']}
        print(f"kNN found {len(knn_results)} results.")
    
        return bm25_results, knn_results
    
    if __name__ == "__main__":
        # Query where semantic search might be better
        query_semantic = "managing money for software projects"
        bm25_res, knn_res = run_hybrid_search(query_semantic)
        print("BM25 results (ID: Score):", bm25_res)
        print("kNN results (ID: Score):", knn_res)
        
        # Query where keyword search is essential
        query_keyword = "report on SOC2 compliance"
        bm25_res, knn_res = run_hybrid_search(query_keyword)
        print("BM25 results (ID: Score):", bm25_res)
        print("kNN results (ID: Score):", knn_res)
    
        # Query with a specific identifier
        query_identifier = "XG-500-A firmware"
        bm25_res, knn_res = run_hybrid_search(query_identifier)
        print("BM25 results (ID: Score):", bm25_res)
        print("kNN results (ID: Score):", knn_res)

    Running this script highlights the dichotomy. For "managing money...", kNN finds doc3 with a high score. For "SOC2 compliance" and "XG-500-A", BM25 correctly identifies doc1 and doc2 respectively, while kNN might struggle or return them with lower confidence. We now have two distinct, ranked lists of document IDs. The next challenge is to merge them intelligently.


    Stage 2: Fusing Results with Reciprocal Rank Fusion (RRF)

    Simply adding the normalized scores from BM25 and kNN is a common but flawed approach. The score distributions are entirely different and normalizing them can be unreliable. A more robust, score-agnostic method is Reciprocal Rank Fusion (RRF).

    RRF's logic is simple and effective: it values documents that appear high up in any of the result lists, regardless of their absolute scores. The formula for a document's RRF score is:

    RRF_Score(d) = Σ (1 / (k + rank_i(d)))

    Where:

  • rank_i(d) is the rank of document d in result list i.
  • k is a constant (commonly set to 60) that dampens the influence of lower-ranked documents.
  • Let's implement an RRF function to fuse our two result sets.

    python
    # 3_rrf_fusion.py
    
    # [Include run_hybrid_search function from previous step]
    # ... (omitted for brevity)
    
    def reciprocal_rank_fusion(list_of_results, k=60):
        """Performs RRF on a list of result dictionaries (doc_id -> score)."""
        fused_scores = {}
        
        print("\n--- Fusing results with RRF ---")
        
        # Convert results to ranked lists of doc IDs
        ranked_lists = []
        for results_dict in list_of_results:
            # Sort by score in descending order
            sorted_docs = sorted(results_dict.items(), key=lambda item: item[1], reverse=True)
            ranked_lists.append([doc_id for doc_id, _ in sorted_docs])
    
        # Calculate RRF scores
        for ranked_list in ranked_lists:
            for rank, doc_id in enumerate(ranked_list, 1):
                if doc_id not in fused_scores:
                    fused_scores[doc_id] = 0.0
                fused_scores[doc_id] += 1.0 / (k + rank)
                
        # Sort documents by their fused score in descending order
        reranked_results = sorted(fused_scores.items(), key=lambda item: item[1], reverse=True)
        
        return reranked_results
    
    if __name__ == "__main__":
        # Re-run the identifier query which is a classic hybrid case
        query_identifier = "XG-500-A firmware"
        bm25_res, knn_res = run_hybrid_search(query_identifier)
        
        # For this query, BM25 will likely rank 'doc2' first.
        # kNN might rank it lower or not at all.
        print("\nBM25 results:", bm25_res)
        print("kNN results:", knn_res)
        
        fused_results = reciprocal_rank_fusion([bm25_res, knn_res])
        
        print("\nFused and re-ranked results (ID, RRF Score):")
        for doc_id, score in fused_results:
            print(f"  {doc_id}: {score:.6f}")
    
        # Let's see another example
        query_semantic = "managing money for software projects"
        bm25_res, knn_res = run_hybrid_search(query_semantic)
        print("\nBM25 results:", bm25_res)
        print("kNN results:", knn_res)
        fused_results = reciprocal_rank_fusion([bm25_res, knn_res])
        print("\nFused and re-ranked results (ID, RRF Score):")
        for doc_id, score in fused_results:
            print(f"  {doc_id}: {score:.6f}")

    With RRF, we now have a single, robustly ranked list of candidate documents. For the "XG-500-A" query, doc2 will receive a high RRF score because it was ranked #1 by BM25, even if kNN ranked it poorly. For the "managing money..." query, doc3 will be at the top. We've successfully created a retriever that is resilient to the weaknesses of any single method.


    Stage 3: Precision Boost with Cross-Encoder Re-ranking

    The hybrid search stage is about recall—finding all potentially relevant documents. This next stage is about precision—finding the most relevant document within that candidate set.

    Our initial embeddings were created with a bi-encoder. It encodes the query and documents independently into vectors. This is extremely fast, making it suitable for searching over millions of documents.

    A cross-encoder, on the other hand, takes both the query and a candidate document as a single input pair and outputs a relevance score. This allows the model to perform full self-attention across both texts, leading to a much more accurate and nuanced judgment of relevance. The trade-off is speed; it's computationally infeasible to use a cross-encoder on an entire corpus.

    The production pattern is therefore:

  • Retrieve: Use fast hybrid search to get a candidate set (e.g., top 20-50 documents).
  • Re-rank: Use a slow but powerful cross-encoder on this small candidate set to get the final, precise ranking.
  • We'll use a pre-trained cross-encoder from the sentence-transformers library.

    python
    # 4_cross_encoder_reranking.py
    
    from sentence_transformers.cross_encoder import CrossEncoder
    
    # [Include run_hybrid_search and reciprocal_rank_fusion functions]
    # ...
    
    # --- Load Cross-Encoder Model ---
    print("Loading Cross-Encoder model...")
    # Models like 'ms-marco-MiniLM-L-6-v2' are trained for relevance ranking
    cross_encoder_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
    print("Cross-Encoder model loaded.")
    
    def fetch_documents_by_ids(doc_ids):
        """A helper to fetch document content from Elasticsearch."""
        if not doc_ids:
            return {}
        response = es_client.mget(index=INDEX_NAME, ids=doc_ids)
        return {hit['_id']: hit['_source']['text_content'] for hit in response['docs'] if hit['found']}
    
    def rerank_with_cross_encoder(query, fused_results):
        """Re-ranks a list of fused document IDs using a Cross-Encoder."""
        print("\n--- Re-ranking with Cross-Encoder ---")
        
        if not fused_results:
            return []
            
        # Prepare pairs for the cross-encoder: [ (query, doc_text), ... ]
        doc_ids = [doc_id for doc_id, _ in fused_results]
        documents_content = fetch_documents_by_ids(doc_ids)
        
        sentence_pairs = [(query, documents_content.get(doc_id, "")) for doc_id in doc_ids]
        
        # Predict scores
        print(f"Predicting relevance scores for {len(sentence_pairs)} pairs...")
        scores = cross_encoder_model.predict(sentence_pairs, show_progress_bar=False)
        
        # Combine IDs with new scores and sort
        reranked = list(zip(doc_ids, scores))
        reranked.sort(key=lambda x: x[1], reverse=True)
        
        return reranked
    
    if __name__ == "__main__":
        query = "GDPR update"
        
        # 1. Retrieve
        bm25_res, knn_res = run_hybrid_search(query)
        
        # 2. Fuse
        fused_results = reciprocal_rank_fusion([bm25_res, knn_res])
        print("\nFused RRF Results (Top 5):")
        for doc_id, score in fused_results[:5]:
            print(f"  {doc_id}: {score:.6f}")
    
        # 3. Re-rank
        final_results = rerank_with_cross_encoder(query, fused_results)
        
        print("\nFinal Cross-Encoder Re-ranked Results (ID, Relevance Score):")
        for doc_id, score in final_results:
            print(f"  {doc_id}: {score:.6f}")
    
        # Fetch content for the top result to verify
        top_doc_id = final_results[0][0]
        top_doc_content = fetch_documents_by_ids([top_doc_id])
        print(f"\nTop result content for '{top_doc_id}':\n  '{top_doc_content[top_doc_id]}'")

    Running this final script demonstrates the complete pipeline. For a query like "GDPR update", hybrid search might return doc5 (direct match) and doc1 (mentions compliance reports, semantically related). The cross-encoder will then analyze ("GDPR update", "...GDPR policies were updated...") versus ("GDPR update", "...SOC2 compliance report...") and assign a much higher score to doc5, ensuring the most precise context is passed to the LLM.


    End-to-End Production Pipeline & Performance Considerations

    Let's assemble the full, production-ready pipeline and discuss the critical performance trade-offs and edge cases.

    The Complete Pipeline

    python
    # 5_full_pipeline.py
    
    # [Combine all previous functions and imports here]
    # ...
    
    class AdvancedRAGPipeline:
        def __init__(self, es_host="http://localhost:9200", index_name="hybrid_rag_index"):
            self.es_client = Elasticsearch(es_host)
            self.index_name = index_name
            self.bi_encoder = SentenceTransformer('all-MiniLM-L6-v2')
            self.cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
            print("Advanced RAG Pipeline initialized.")
    
        def retrieve_and_rerank(self, query, top_k_retrieval=20, top_k_final=5):
            """Executes the full multi-stage retrieval and re-ranking process."""
            if not self.es_client.ping():
                raise ConnectionError("Could not connect to Elasticsearch")
    
            # 1. Hybrid Search (Retrieve)
            query_vector = self.bi_encoder.encode(query).tolist()
            
            sparse_query = {"match": {"text_content": {"query": query}}}
            dense_query = {"field": "document_vector", "query_vector": query_vector, "k": top_k_retrieval, "num_candidates": 50}
            
            bm25_response = self.es_client.search(index=self.index_name, query=sparse_query, size=top_k_retrieval)
            bm25_results = {hit['_id']: hit['_score'] for hit in bm25_response['hits']['hits']}
            
            knn_response = self.es_client.search(index=self.index_name, knn=dense_query, size=top_k_retrieval, _source=False)
            knn_results = {hit['_id']: hit['_score'] for hit in knn_response['hits']['hits']}
    
            # 2. Reciprocal Rank Fusion (Fuse)
            fused_results = self._reciprocal_rank_fusion([bm25_results, knn_results])
            
            # 3. Cross-Encoder (Re-rank)
            if not fused_results:
                return []
                
            doc_ids_to_rerank = [doc_id for doc_id, _ in fused_results[:top_k_retrieval]] # Rerank the top candidates
            docs_content = self._fetch_documents_by_ids(doc_ids_to_rerank)
            
            pairs = [(query, docs_content.get(doc_id, "")) for doc_id in doc_ids_to_rerank]
            scores = self.cross_encoder.predict(pairs)
            
            final_results_with_scores = list(zip(doc_ids_to_rerank, scores))
            final_results_with_scores.sort(key=lambda x: x[1], reverse=True)
    
            # 4. Prepare final context
            final_doc_ids = [doc_id for doc_id, _ in final_results_with_scores[:top_k_final]]
            final_docs_content = [docs_content.get(doc_id, "") for doc_id in final_doc_ids]
            
            return final_docs_content
    
        def _reciprocal_rank_fusion(self, list_of_results, k=60):
            # (Implementation from step 3)
            fused_scores = {}
            ranked_lists = [sorted(res.items(), key=lambda i: i[1], reverse=True) for res in list_of_results]
            for ranked_list in ranked_lists:
                for rank, (doc_id, _) in enumerate(ranked_list, 1):
                    if doc_id not in fused_scores: fused_scores[doc_id] = 0
                    fused_scores[doc_id] += 1.0 / (k + rank)
            return sorted(fused_scores.items(), key=lambda i: i[1], reverse=True)
    
        def _fetch_documents_by_ids(self, doc_ids):
            # (Implementation from step 4)
            if not doc_ids: return {}
            response = self.es_client.mget(index=self.index_name, ids=doc_ids)
            return {hit['_id']: hit['_source']['text_content'] for hit in response['docs'] if hit['found']}
    
    if __name__ == "__main__":
        pipeline = AdvancedRAGPipeline()
        
        query = "What were the findings of Dr. Reed's research?"
        final_context = pipeline.retrieve_and_rerank(query)
        
        print(f"\n--- Final Context for LLM (Query: '{query}') ---")
        for i, context in enumerate(final_context, 1):
            print(f"[{i}] {context[:100]}...")

    Performance and Latency Trade-offs

    This multi-stage pipeline introduces latency. A senior engineer must quantify and manage it.

  • Hybrid Search: Typically fast. Elasticsearch can execute a hybrid query in 50-200ms on a well-tuned cluster, depending on the index size and num_candidates for kNN.
  • RRF Fusion: Negligible. This is an in-memory computation, taking <1ms.
  • Cross-Encoder Re-ranking: This is the bottleneck. Re-ranking 20 candidates with a MiniLM-sized model on a CPU can take 200-500ms. On a GPU, this can be reduced to 50-100ms.
  • Total Latency: ~300-700ms before you can even query the LLM.

    Optimization Strategies:

  • Model Quantization: Quantize the cross-encoder model to INT8. This can significantly speed up inference on CPUs with a minor drop in accuracy.
  • Smaller Models: Use a smaller, distilled cross-encoder if latency is critical. The trade-off is accuracy.
  • Hardware Acceleration: Offload cross-encoder inference to a GPU. This is the most effective solution for high-throughput systems.
  • Parameter Tuning: Reduce the number of candidates for re-ranking (e.g., from 20 to 10). This is a direct trade-off between recall and latency. Measure the impact on your evaluation set.
  • Edge Case Handling

  • No Results: What if one or both retrievers return nothing? The RRF function should be robust to empty result sets. The overall pipeline should handle an empty context gracefully, perhaps by returning a default message to the user instead of querying the LLM.
  • Mismatched IDs: Ensure consistent document IDs across all systems. A mismatch between your retriever and your document store will break the pipeline.
  • Context Window Management: The final context passed to the LLM must not exceed its token limit. Implement a robust truncation strategy. A simple approach is to concatenate documents until you are close to the limit, then truncate the last one. A more advanced strategy is to prioritize keeping the beginning of each document, as it often contains the most salient information.
  • Conclusion: From Prototype to Production-Grade RAG

    Moving a RAG system from a simple prototype to a reliable production service requires moving beyond naive vector search. The multi-stage retrieve-fuse-rerank architecture presented here provides a robust framework for achieving state-of-the-art relevance.

    By combining the lexical precision of BM25 with the semantic power of dense vectors, we create a retriever that is resilient to a wide range of query types. By fusing results with RRF, we leverage the strengths of both systems in a principled, score-agnostic way. Finally, by applying a cross-encoder for a final re-ranking pass, we ensure that the context provided to the LLM is of the highest possible precision.

    While this architecture introduces complexity and latency, these are manageable engineering challenges. The payoff is a significant leap in retrieval quality, leading to more accurate, reliable, and trustworthy LLM-powered applications.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles