Production RAG: Hybrid Search & Re-ranking in Elasticsearch

20 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Relevance Ceiling of Naive Vector Search

For senior engineers building Retrieval-Augmented Generation (RAG) systems, the initial proof-of-concept using a simple vector database often hits a hard relevance ceiling in production. While dense vector search is powerful for capturing semantic similarity, it frequently fails on queries containing domain-specific acronyms, product SKUs, exact error codes, or proper nouns that a general-purpose embedding model hasn't adequately captured. A query for error code E404-B2 might semantically map to general "not found" documents, completely missing the critical, specific document that explains this exact code.

This is the fundamental limitation of relying solely on bi-encoder models for retrieval. They map queries and documents into a shared vector space, but this process is inherently lossy. The nuance of lexical matching is lost. Production-grade RAG demands a more sophisticated approach: a multi-stage retrieval architecture that combines the best of both worlds—the semantic understanding of dense vectors and the keyword precision of sparse vectors (like BM25).

This article presents a battle-tested architecture for such a system using Elasticsearch. We will architect and implement a three-stage pipeline:

  • Parallel Retrieval: Execute a dense vector search (k-NN) and a sparse lexical search (BM25) simultaneously.
  • Fusion: Merge the results from both retrieval methods using a robust, rank-based algorithm: Reciprocal Rank Fusion (RRF).
  • Re-ranking: Pass the fused, candidate set of documents to a more powerful, but computationally expensive, cross-encoder model to produce the final, highly-relevant ranking.
  • We will move beyond theory and into concrete implementation, covering index design, complex query DSL, performance tuning, and critical edge cases you will encounter in a production environment.


    1. Architecting the Elasticsearch Index for Hybrid Search

    The foundation of our system is an Elasticsearch index capable of efficiently handling both dense and sparse retrieval. This requires a carefully designed mapping.

    Our document structure will contain the original text content, which will be analyzed for BM25, and a dense_vector field to store the embedding generated by a bi-encoder model (e.g., all-mpnet-base-v2).

    Advanced Index Mapping

    Here is a production-ready index mapping. Note the specific configuration for the dense_vector field, specifying the vector dimension and the similarity metric. We also define a custom analyzer for our text field to handle specific tokenization needs.

    json
    PUT /production-rag-docs
    {
      "settings": {
        "analysis": {
          "analyzer": {
            "custom_english_analyzer": {
              "type": "custom",
              "tokenizer": "standard",
              "filter": [
                "english_possessive_stemmer",
                "lowercase",
                "english_stop",
                "english_stemmer"
              ]
            }
          },
          "filter": {
            "english_stop": {
              "type": "stop",
              "stopwords": "_english_"
            },
            "english_stemmer": {
              "type": "stemmer",
              "language": "english"
            },
            "english_possessive_stemmer": {
              "type": "stemmer",
              "language": "possessive_english"
            }
          }
        },
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "title": {
            "type": "text",
            "analyzer": "custom_english_analyzer"
          },
          "content": {
            "type": "text",
            "analyzer": "custom_english_analyzer"
          },
          "content_vector": {
            "type": "dense_vector",
            "dims": 768,
            "index": true,
            "similarity": "cosine"
          },
          "metadata": {
            "type": "object",
            "enabled": false
          }
        }
      }
    }

    Key Production Considerations:

    dims: This must* match the output dimension of your chosen embedding model (e.g., 768 for all-mpnet-base-v2). Mismatches are a common source of runtime errors.

    * similarity: cosine is generally the standard for normalized embeddings from sentence-transformer models. dot_product can also be used if vectors are normalized.

    * index: true: This enables the creation of an HNSW (Hierarchical Navigable Small World) graph for approximate nearest neighbor search, which is essential for performance on large datasets. Without this, Elasticsearch would perform a brute-force scan.

    * Custom Analyzer: Don't just use the standard analyzer. Tailor it to your domain. You might add synonym filters, stemmers, or stop words relevant to your corpus.

    * metadata: Disabling indexing for metadata fields (enabled: false) that are not searchable reduces index size and improves indexing speed.

    Data Ingestion and Embedding Pipeline

    Ingesting data involves generating embeddings for each document and then bulk-indexing into Elasticsearch. Here is a Python script demonstrating this process using the sentence-transformers library and the official Elasticsearch client.

    python
    import json
    from sentence_transformers import SentenceTransformer
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    from tqdm import tqdm
    
    # --- Configuration ---
    ELASTIC_CLOUD_ID = "YOUR_CLOUD_ID"
    ELASTIC_API_KEY = "YOUR_API_KEY"
    INDEX_NAME = "production-rag-docs"
    MODEL_NAME = 'all-mpnet-base-v2'
    
    # --- Sample Documents ---
    documents = [
        {
            "id": "doc-01",
            "title": "Elasticsearch k-NN Search",
            "content": "k-nearest neighbor (k-NN) search finds the k nearest vectors to a query vector, as measured by a similarity metric. It is used to power applications like product recommendations and semantic search.",
            "metadata": {"source": "es_docs"}
        },
        {
            "id": "doc-02",
            "title": "BM25 Algorithm",
            "content": "Okapi BM25 is a ranking function used by search engines to rank matching documents according to their relevance to a given search query. It is a bag-of-words retrieval function that ranks a set of documents based on the query terms appearing in each document.",
            "metadata": {"source": "wiki"}
        },
        {
            "id": "doc-03",
            "title": "Reciprocal Rank Fusion (RRF)",
            "content": "RRF is a method for combining multiple result sets with different relevance scores into a single result set. It is simple, robust, and effective, especially when the scales of the scores from different systems are unknown.",
            "metadata": {"source": "research_paper"}
        }
        # ... add more documents
    ]
    
    # --- Implementation ---
    def main():
        print("Connecting to Elasticsearch...")
        es_client = Elasticsearch(
            cloud_id=ELASTIC_CLOUD_ID,
            api_key=ELASTIC_API_KEY
        )
        print(f"Connection successful: {es_client.info()['cluster_name']}")
    
        print(f"Loading embedding model: {MODEL_NAME}")
        model = SentenceTransformer(MODEL_NAME)
    
        # Check if index exists and delete if you want to re-index
        if es_client.indices.exists(index=INDEX_NAME):
            print(f"Deleting existing index '{INDEX_NAME}'...")
            es_client.indices.delete(index=INDEX_NAME)
        
        # Re-create index with the mapping from above
        # (Assuming the mapping JSON is saved in 'index_mapping.json')
        with open('index_mapping.json', 'r') as f:
            mapping = json.load(f)
        print(f"Creating index '{INDEX_NAME}'...")
        es_client.indices.create(index=INDEX_NAME, body=mapping)
    
        def generate_actions():
            """Generator function for bulk ingestion"""
            print("Generating embeddings and preparing for bulk indexing...")
            contents = [doc['content'] for doc in documents]
            embeddings = model.encode(contents, show_progress_bar=True)
            
            for i, doc in enumerate(documents):
                yield {
                    "_index": INDEX_NAME,
                    "_id": doc['id'],
                    "_source": {
                        "title": doc['title'],
                        "content": doc['content'],
                        "content_vector": embeddings[i].tolist(),
                        "metadata": doc['metadata']
                    }
                }
    
        print("Starting bulk ingestion...")
        success, failed = bulk(es_client, generate_actions())
        print(f"Bulk ingestion complete. Success: {success}, Failed: {failed}")
    
    if __name__ == "__main__":
        main()

    2. Implementing Hybrid Search with Client-Side RRF

    Now that our data is indexed, we can perform the retrieval. A naive approach might be to try and combine BM25 and k-NN scores directly in a single Elasticsearch query. However, this is problematic because their scores are on different, incomparable scales. BM25 scores are unbounded, while cosine similarity is bounded [-1, 1]. A simple weighted sum is brittle and difficult to tune.

    A more robust solution is Reciprocal Rank Fusion (RRF). RRF disregards the raw scores and instead uses the rank of a document in each result set. The formula is simple: for each document, sum the reciprocal of its rank across all result sets. The final RRF score for a document d is:

    RRF_Score(d) = Σ (1 / (k + rank_i(d)))

    where rank_i(d) is the rank of document d in result set i, and k is a constant to mitigate the effect of high ranks (a common value is 60).

    To implement this, we'll perform two separate queries to Elasticsearch—one for BM25 and one for k-NN—and then fuse the results on the client side.

    The Hybrid Search Query Logic

    Here is the Python function that orchestrates the hybrid search.

    python
    from sentence_transformers import SentenceTransformer
    from elasticsearch import Elasticsearch
    
    # Assume es_client and model are initialized as before
    # ELASTIC_CLOUD_ID = "..."
    # ELASTIC_API_KEY = "..."
    # INDEX_NAME = "production-rag-docs"
    # MODEL_NAME = 'all-mpnet-base-v2'
    
    # es_client = Elasticsearch(cloud_id=ELASTIC_CLOUD_ID, api_key=ELASTIC_API_KEY)
    # model = SentenceTransformer(MODEL_NAME)
    
    def execute_hybrid_search(query_text: str, k: int = 50, num_candidates: int = 100, rrf_k: int = 60):
        """
        Executes a hybrid search query against Elasticsearch and fuses the results using RRF.
    
        :param query_text: The user's search query.
        :param k: The number of results to retrieve from each searcher (BM25 and k-NN).
        :param num_candidates: The number of candidates for the k-NN search.
        :param rrf_k: The ranking constant for RRF.
        :return: A list of fused and ranked documents.
        """
        
        # 1. Generate query embedding
        query_vector = model.encode(query_text).tolist()
    
        # 2. Perform k-NN search
        knn_query = {
            "field": "content_vector",
            "query_vector": query_vector,
            "k": k,
            "num_candidates": num_candidates
        }
        try:
            knn_response = es_client.search(
                index=INDEX_NAME,
                knn=knn_query,
                _source=["title", "content"]
            )
            knn_hits = knn_response["hits"]["hits"]
        except Exception as e:
            print(f"Error during k-NN search: {e}")
            knn_hits = []
    
        # 3. Perform BM25 search
        bm25_query = {
            "match": {
                "content": {
                    "query": query_text
                }
            }
        }
        try:
            bm25_response = es_client.search(
                index=INDEX_NAME,
                query=bm25_query,
                size=k,
                _source=["title", "content"]
            )
            bm25_hits = bm25_response["hits"]["hits"]
        except Exception as e:
            print(f"Error during BM25 search: {e}")
            bm25_hits = []
    
        # 4. Fuse results with RRF
        ranked_results = {}
    
        # Process k-NN results
        for rank, hit in enumerate(knn_hits):
            doc_id = hit["_id"]
            if doc_id not in ranked_results:
                ranked_results[doc_id] = {"score": 0, "doc": hit["_source"]}
            ranked_results[doc_id]["score"] += 1 / (rrf_k + rank + 1)
    
        # Process BM25 results
        for rank, hit in enumerate(bm25_hits):
            doc_id = hit["_id"]
            if doc_id not in ranked_results:
                ranked_results[doc_id] = {"score": 0, "doc": hit["_source"]}
            ranked_results[doc_id]["score"] += 1 / (rrf_k + rank + 1)
    
        # Sort by RRF score
        fused_results = sorted(ranked_results.values(), key=lambda x: x["score"], reverse=True)
    
        return fused_results
    
    # --- Example Usage ---
    if __name__ == "__main__":
        query = "What is RRF?"
        results = execute_hybrid_search(query)
        
        print(f"Hybrid Search Results for: '{query}'\n")
        for i, result in enumerate(results[:5]):
            print(f"Rank {i+1} (Score: {result['score']:.4f})")
            print(f"  Title: {result['doc']['title']}")
            print(f"  Content: {result['doc']['content'][:150]}...")
            print("---")
    

    Performance Considerations for k-NN:

    k vs. num_candidates: This is a critical tuning parameter. num_candidates is the number of nearest neighbor candidates to consider on each shard. It must be greater than or equal to k. Increasing num_candidates improves accuracy (recall) at the cost of latency. A good starting point is num_candidates = k 2, but this should be tuned based on performance testing.

    Filtering: In a real application, you might need to apply filters (e.g., for multi-tenancy or date ranges). Elasticsearch's k-NN search supports post-filtering, which can impact performance as the filter is applied after* the approximate nearest neighbors are found. If your filter is highly selective, it might return fewer than k results. Pre-filtering is not yet as efficient as in traditional search.


    3. The Critical Re-ranking Stage with Cross-Encoders

    The hybrid search stage provides a set of highly relevant candidate documents. However, for SOTA performance, we can add a final re-ranking step. While our initial retrieval used a bi-encoder (which creates embeddings for query and documents independently), the re-ranking stage uses a cross-encoder.

    A cross-encoder takes both the query and a candidate document as a single input and outputs a relevance score. This allows the model to perform full self-attention across both texts, making it significantly more accurate but also orders of magnitude slower. It's infeasible for retrieving from millions of documents, but perfect for re-ranking a small set (e.g., the top 25-50) of candidates from our hybrid search.

    Implementing the Re-ranker

    We'll use a pre-trained cross-encoder from the sentence-transformers library. Models trained on the MS MARCO passage ranking dataset are excellent for this task.

    python
    from sentence_transformers.cross_encoder import CrossEncoder
    
    # This would be part of a larger class or application
    # Initialize the model once and reuse it
    cross_encoder_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
    
    def rerank_documents(query: str, documents: list):
        """
        Re-ranks a list of documents for a given query using a cross-encoder model.
    
        :param query: The user's search query.
        :param documents: A list of document dictionaries from the fusion stage.
        :return: A list of documents sorted by the cross-encoder's relevance score.
        """
        # The cross-encoder expects a list of [query, passage] pairs
        pairs = [[query, doc['doc']['content']] for doc in documents]
        
        # Predict scores
        scores = cross_encoder_model.predict(pairs, show_progress_bar=False)
        
        # Add scores to documents and sort
        for i, doc in enumerate(documents):
            doc['rerank_score'] = scores[i]
        
        reranked_results = sorted(documents, key=lambda x: x['rerank_score'], reverse=True)
        return reranked_results
    
    # --- Example Integration ---
    if __name__ == "__main__":
        # Assume execute_hybrid_search is defined and configured
        query = "What is Reciprocal Rank Fusion?"
        
        # 1. Retrieve and Fuse
        fused_results = execute_hybrid_search(query, k=25, num_candidates=50)
        print(f"Retrieved {len(fused_results)} candidates for re-ranking.")
    
        # 2. Re-rank the top N candidates
        rerank_candidates = fused_results[:25] # Only re-rank the top 25
        final_results = rerank_documents(query, rerank_candidates)
    
        print(f"\nFinal Re-ranked Results for: '{query}'\n")
        for i, result in enumerate(final_results[:5]):
            print(f"Rank {i+1} (Rerank Score: {result['rerank_score']:.4f}, RRF Score: {result['score']:.4f})")
            print(f"  Title: {result['doc']['title']}")
            print(f"  Content: {result['doc']['content'][:150]}...")
            print("---")
    

    This final list of documents is what you would pass into the context window of your Large Language Model (LLM). By using this three-stage process, you ensure the context is as dense with relevant information as possible, dramatically improving the quality of the generated response.


    4. Production Edge Cases and Advanced Considerations

    Deploying this system requires handling several complex scenarios.

    A. Embedding Model Versioning

    What happens when you want to upgrade your embedding model? You cannot simply start indexing new documents with the new model, as the new vectors will live in a different semantic space from the old ones. A v2 vector for "apple" is not comparable to a v1 vector for "orange".

    The Solution: Blue-Green Index Deployment

  • Create a New Index: Create a new index, e.g., production-rag-docs-v2, with the same mapping (or an updated one).
  • Backfill and Re-index: Run a batch job to re-read all your source documents, generate embeddings with the new model, and index them into the v2 index.
  • Use an Alias: In Elasticsearch, use an alias (e.g., production-rag-docs-alias) that points to your live index. Your application should only query the alias.
  • Atomic Switchover: Once the v2 index is fully populated and warmed up, atomically switch the alias to point from the v1 index to the v2 index. This is a zero-downtime operation.
  • json
        POST /_aliases
        {
          "actions": [
            { "remove": { "index": "production-rag-docs-v1", "alias": "production-rag-docs-alias" } },
            { "add":    { "index": "production-rag-docs-v2", "alias": "production-rag-docs-alias" } }
          ]
        }
  • Decommission: After monitoring the new index and confirming its stability, you can safely delete the old v1 index.
  • B. Latency vs. Relevance Tuning

    This architecture introduces several tuning knobs that directly trade latency for relevance:

    * k (retrieval size): Larger k increases the chance of finding relevant documents for the re-ranker but adds latency to both Elasticsearch queries and the RRF step.

    * num_candidates: The most significant performance lever for k-NN. Higher values mean more exhaustive search on each shard, increasing accuracy at a steep latency cost. This must be benchmarked.

    * re-ranker candidate size: The number of documents passed to the cross-encoder. Re-ranking is often the slowest part of the pipeline. Re-ranking the top 25 might be fast enough, but re-ranking 100 could violate your latency SLOs. Deploying the cross-encoder on a GPU-accelerated instance is often necessary for production workloads.

    C. Observability and Monitoring

    How do you know if your system is working well? You need to instrument every stage.

    * Log Trace IDs: A single request should have a trace ID that is logged at each stage: initial query, k-NN results (with scores/ranks), BM25 results (with scores/ranks), RRF fused results, and final re-ranked results. This is invaluable for debugging why a specific query failed.

    * Metrics to Track:

    * p95 latency for each stage (k-NN, BM25, re-ranking).

    * Hit Rate: For a set of evaluation queries with known-good documents, what percentage of the time is the correct document found by the hybrid search stage? What about after re-ranking?

    * Mean Reciprocal Rank (MRR): A standard metric for evaluating ranking systems.

    * Cache Hit Rate: If you implement caching for embeddings or re-ranker scores.

    D. Handling Disjoint Result Sets

    An interesting edge case is when the k-NN and BM25 searches return completely disjoint sets of documents. This is often a good thing, as it's precisely what hybrid search is designed to handle. RRF naturally manages this by simply adding documents from both sets into the fused list based on their respective ranks. The re-ranker then acts as the final arbiter of relevance. Your logging should flag these cases so you can analyze them; they often reveal queries where semantic and lexical signals are pointing in very different directions, providing valuable insight into your data and user intent.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles