Advanced RAG: Two-Stage Retrieval and Re-ranking Pipelines
The Precision Ceiling of Naive RAG
For any senior engineer who has moved a Retrieval-Augmented Generation (RAG) system from a Jupyter notebook to a staging environment, the limitations of naive vector search become painfully apparent. A simple top-k retrieval against a vector index works beautifully for straightforward queries but crumbles when faced with domain-specific jargon, semantic ambiguity, or queries requiring both keyword matching and conceptual understanding. This is especially true in high-stakes domains like legal, financial, or medical analysis.
The core problem is the inherent trade-off in single-stage retrieval. You are forced to choose between:
what is the liability clause concerning data breaches?). However, they often fail on specific keywords, acronyms, or identifiers (find documents mentioning "GDPR Article 30"). They can also retrieve documents that are thematically related but factually irrelevant, a phenomenon that leads directly to model hallucination.find "force majeure" clauses). It's fast and reliable for what it does, but it completely misses semantic nuance. A query for "acts of God" would fail to retrieve a document that only mentions "hurricanes, earthquakes, or floods."This isn't a problem you can solve by simply tweaking the k value or using Maximal Marginal Relevance (MMR). These are superficial fixes to a fundamental architectural flaw. Production-grade RAG demands a more sophisticated approach that combines the strengths of both worlds. This is where a Two-Stage Retrieval Pipeline with a Cross-Encoder Re-ranker becomes not just an optimization, but a necessity.
In this post, we will architect and implement such a system from the ground up. We will not be using high-level abstractions from frameworks that hide the complexity. Instead, we'll build the components ourselves to understand the mechanics, trade-offs, and tuning parameters at each stage.
Our pipeline will look like this:
This architecture directly addresses the recall-precision problem. Stage 1 ensures we don't miss relevant documents (high recall), and Stage 2 ensures the documents we finally select are the most relevant (high precision).
Architectural Deep Dive: Bi-Encoders vs. Cross-Encoders
Before we write any code, it's critical to understand the fundamental difference between the models used for retrieval and re-ranking.
Bi-Encoders (For Retrieval)
A bi-encoder, like the models used for generating sentence embeddings (all-MiniLM-L6-v2, bge-large-en-v1.5), processes the query and the documents independently. 
Query -> [Encoder] -> Query Vector
Doc 1 -> [Encoder] -> Doc 1 Vector
Doc 2 -> [Encoder] -> Doc 2 Vector
...
Doc N -> [Encoder] -> Doc N VectorSearch is then a nearest-neighbor lookup (e.g., cosine similarity) between the query vector and the pre-computed document vectors. This is incredibly fast and scalable because document vectors can be indexed in a vector database. However, since the model never sees the query and document together, it can only capture a general semantic relationship, not a fine-grained relevance score.
Cross-Encoders (For Re-ranking)
A cross-encoder, by contrast, takes both the query and a document as a single input and passes them through a powerful Transformer model (like BERT) simultaneously.
[CLS] Query [SEP] Document 1 [SEP] -> [Transformer] -> Relevance Score (0-1)
[CLS] Query [SEP] Document 2 [SEP] -> [Transformer] -> Relevance Score (0-1)
...This allows the model to perform deep attention between the query and document tokens. It can understand nuance, word order, and context in a way a bi-encoder simply cannot. The output is a single, highly accurate relevance score.
The drawback? It's computationally expensive. You cannot pre-process documents. For every query, you must run the cross-encoder N times for N candidate documents. This is why it's completely infeasible for searching a corpus of millions of documents but perfectly suited for re-ranking a smaller candidate set of a few hundred.
This bi-encoder/cross-encoder distinction is the theoretical underpinning of our two-stage architecture. We use the fast, scalable bi-encoder (and BM25) for the first stage and the slow, powerful cross-encoder for the second.
Production Implementation: Legal Clause Analysis RAG
Let's build our system. We'll use a sample dataset of legal contract clauses. Our goal is to answer specific questions about these clauses.
Prerequisites:
* Python 3.9+
* Docker and Docker Compose
* An Elasticsearch instance (we'll run one via Docker)
*   Libraries: elasticsearch, sentence-transformers, torch, tqdm
First, set up your environment:
pip install elasticsearch sentence-transformers torch tqdmNext, create a docker-compose.yml to run Elasticsearch:
version: '3.8'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.1
    container_name: es01
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - 'ES_JAVA_OPTS=-Xms1g -Xmx1g'
    ports:
      - 9200:9200
    volumes:
      - esdata:/usr/share/elasticsearch/data
volumes:
  esdata:
    driver: localRun docker-compose up -d to start the service.
Step 1: Data Preparation and Hybrid Indexing
We need a dataset and a way to index it for both keyword and vector search.
Let's create a sample dataset of legal clauses.
# sample_data.py
clauses = [
    {
        "id": "clause-001",
        "contract_name": "MSA-ACME-2023",
        "text": "Confidential Information shall not include information that: (a) is or becomes publicly known and available through no fault of the receiving party; (b) is already in the possession of the receiving party without restriction on use or disclosure; or (c) is independently developed by the receiving party without use of or reference to the disclosing party's Confidential Information."
    },
    {
        "id": "clause-002",
        "contract_name": "MSA-ACME-2023",
        "text": "The term of this Agreement shall commence on the Effective Date and shall continue for a period of three (3) years, unless terminated earlier pursuant to the terms herein. The Agreement shall automatically renew for successive one (1) year periods unless either party provides written notice of non-renewal at least sixty (60) days prior to the end of the then-current term."
    },
    {
        "id": "clause-003",
        "contract_name": "SOW-Globex-2024",
        "text": "In the event of a Force Majeure Event, the party prevented from or delayed in performing its obligations under this Agreement shall be excused from performance for the duration of the event. A Force Majeure Event includes, but is not limited to, acts of God, war, terrorism, riots, embargoes, acts of civil or military authorities, fire, floods, accidents, or strikes."
    },
    {
        "id": "clause-004",
        "contract_name": "MSA-ACME-2023",
        "text": "Each party shall indemnify, defend, and hold harmless the other party... from and against any and all claims, losses, damages, liabilities, and expenses arising out of or related to any third-party claim alleging that the indemnifying party's services or materials infringe upon any patent, copyright, trademark, or trade secret of such third party."
    },
    {
        "id": "clause-005",
        "contract_name": "NDA-Initech-2022",
        "text": "Upon termination of this Agreement, the receiving party shall promptly return or, at the disclosing party's request, destroy all Confidential Information and any copies thereof. The receiving party shall provide written certification of its compliance with this section within ten (10) business days of the request."
    }
]
# ... add at least 50-100 more clauses for a realistic testNow, let's create the Elasticsearch index and ingest the data. We'll use a dense_vector mapping for our embeddings.
# index_data.py
import os
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
from sample_data import clauses # Assuming you have a larger list here
# --- Constants ---
ES_HOST = "http://localhost:9200"
INDEX_NAME = "legal_clauses_hybrid"
BI_ENCODER_MODEL = 'all-MiniLM-L6-v2' # Fast and effective for retrieval
EMBEDDING_DIM = 384 # Based on the model
def create_es_client():
    return Elasticsearch(hosts=[ES_HOST])
def create_index(es_client):
    if es_client.indices.exists(index=INDEX_NAME):
        print(f"Index '{INDEX_NAME}' already exists. Deleting.")
        es_client.indices.delete(index=INDEX_NAME)
    index_body = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "mappings": {
            "properties": {
                "contract_name": {"type": "keyword"},
                "text": {"type": "text"},
                "embedding": {
                    "type": "dense_vector",
                    "dims": EMBEDDING_DIM,
                    "index": True,
                    "similarity": "cosine" # Use cosine similarity for sentence embeddings
                }
            }
        }
    }
    es_client.indices.create(index=INDEX_NAME, body=index_body)
    print(f"Index '{INDEX_NAME}' created.")
def main():
    es_client = create_es_client()
    create_index(es_client)
    print(f"Loading bi-encoder model: {BI_ENCODER_MODEL}")
    model = SentenceTransformer(BI_ENCODER_MODEL)
    print("Indexing documents...")
    for clause in tqdm(clauses):
        embedding = model.encode(clause['text']).tolist()
        doc = {
            "contract_name": clause['contract_name'],
            "text": clause['text'],
            "embedding": embedding
        }
        es_client.index(index=INDEX_NAME, id=clause['id'], document=doc)
    es_client.indices.refresh(index=INDEX_NAME)
    print("Indexing complete.")
if __name__ == "__main__":
    main()
Run this script (python index_data.py). It will create an index in Elasticsearch with a text field for BM25 search and an embedding field for dense vector search.
Step 2: Implementing the Stage 1 Retriever
Our first stage will perform a hybrid search. We'll combine BM25 and k-Nearest Neighbor (kNN) search scores using a technique called Reciprocal Rank Fusion (RRF) to get a balanced initial ranking. Elasticsearch makes this straightforward with the rank feature.
# stage1_retriever.py
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
# --- Constants from indexing script ---
ES_HOST = "http://localhost:9200"
INDEX_NAME = "legal_clauses_hybrid"
BI_ENCODER_MODEL = 'all-MiniLM-L6-v2'
class Stage1Retriever:
    def __init__(self):
        self.es_client = Elasticsearch(hosts=[ES_HOST])
        self.model = SentenceTransformer(BI_ENCODER_MODEL)
        print("Stage 1 Retriever initialized.")
    def retrieve(self, query: str, top_n: int = 100):
        # 1. Encode the query for vector search
        query_embedding = self.model.encode(query).tolist()
        # 2. Define the hybrid search query
        # This query combines a BM25 full-text search and a kNN vector search.
        # The 'rank' section uses RRF to combine the scores from both.
        search_query = {
            "size": top_n,
            "query": {
                "match": {
                    "text": {
                        "query": query
                    }
                }
            },
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding,
                "k": top_n,
                "num_candidates": top_n + 50 # num_candidates should be > k
            },
            "rank": {
                "rrf": {}
            }
        }
        response = self.es_client.search(
            index=INDEX_NAME,
            body=search_query
        )
        results = []
        for hit in response['hits']['hits']:
            results.append({
                "id": hit['_id'],
                "text": hit['_source']['text'],
                "score": hit['_score']
            })
        
        return results
# --- Example Usage ---
if __name__ == "__main__":
    retriever = Stage1Retriever()
    test_query = "what are the conditions for non-renewal of a contract?"
    
    print(f"Executing Stage 1 retrieval for query: '{test_query}'")
    candidates = retriever.retrieve(test_query, top_n=10)
    
    print(f"\nRetrieved {len(candidates)} candidates:")
    for i, doc in enumerate(candidates):
        print(f"{i+1}. ID: {doc['id']}, Score: {doc['score']:.4f}")
        print(f"   Text: {doc['text'][:100]}...")
When you run this, you'll see a list of candidate documents. This list is our input for the next, more precise stage. The key here is setting top_n high enough (e.g., 50-100) to ensure the truly relevant documents are likely included, even if their initial scores aren't the highest.
Step 3: Implementing the Stage 2 Re-ranker
Now for the precision part. We'll use a pre-trained cross-encoder model from the sentence-transformers library. These models are fine-tuned for Semantic Textual Similarity (STS) or relevance tasks and are perfect for re-ranking.
# stage2_reranker.py
import torch
from sentence_transformers import CrossEncoder
# A good, lightweight cross-encoder model
CROSS_ENCODER_MODEL = 'cross-encoder/ms-marco-MiniLM-L-6-v2'
class Stage2ReRanker:
    def __init__(self):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = CrossEncoder(CROSS_ENCODER_MODEL, max_length=512, device=self.device)
        print(f"Stage 2 Re-ranker initialized on device: {self.device}")
    def rerank(self, query: str, documents: list, top_k: int = 5):
        if not documents:
            return []
        # The cross-encoder expects pairs of (query, document_text)
        pairs = [(query, doc['text']) for doc in documents]
        
        # Predict scores for all pairs. This is the computationally expensive step.
        scores = self.model.predict(pairs, show_progress_bar=False)
        # Combine scores with original documents
        for i in range(len(scores)):
            documents[i]['rerank_score'] = scores[i]
        # Sort documents by the new rerank_score in descending order
        sorted_documents = sorted(documents, key=lambda x: x['rerank_score'], reverse=True)
        # Return the top_k results
        return sorted_documents[:top_k]
# --- Example Usage (requires Stage 1 output) ---
if __name__ == "__main__":
    # Mockup of Stage 1 output for standalone testing
    from stage1_retriever import Stage1Retriever
    # 1. Get candidates from Stage 1
    stage1 = Stage1Retriever()
    test_query = "what happens if there is a natural disaster like a flood?"
    candidates = stage1.retrieve(test_query, top_n=20)
    print(f"Retrieved {len(candidates)} candidates from Stage 1.")
    # 2. Re-rank with Stage 2
    stage2 = Stage2ReRanker()
    print("\nExecuting Stage 2 re-ranking...")
    final_results = stage2.rerank(test_query, candidates, top_k=3)
    print(f"\nFinal {len(final_results)} results after re-ranking:")
    for i, doc in enumerate(final_results):
        print(f"{i+1}. ID: {doc['id']}, Re-rank Score: {doc['rerank_score']:.4f}")
        print(f"   Text: {doc['text'][:150]}...")
Notice the difference in the query. The query "what happens if there is a natural disaster like a flood?" has no keyword overlap with the "Force Majeure" clause. A pure BM25 search would likely fail. A pure vector search might retrieve it, but it could also retrieve other unrelated clauses about liabilities or events. The cross-encoder, however, can understand the strong semantic link between "natural disaster like a flood" and the defined "Force Majeure Event" that includes "floods" and "acts of God", giving it a very high score.
Step 4: Building the End-to-End Pipeline
Now, let's combine everything into a single, cohesive pipeline class.
# full_pipeline.py
import time
from stage1_retriever import Stage1Retriever
from stage2_reranker import Stage2ReRanker
class AdvancedRAGPipeline:
    def __init__(self):
        self.stage1 = Stage1Retriever()
        self.stage2 = Stage2ReRanker()
    def query(self, query_text: str, stage1_top_n: int = 100, stage2_top_k: int = 5):
        print("--- Starting RAG Pipeline ---")
        
        # Stage 1: High-recall retrieval
        t0 = time.time()
        candidate_docs = self.stage1.retrieve(query_text, top_n=stage1_top_n)
        t1 = time.time()
        print(f"Stage 1: Retrieved {len(candidate_docs)} candidates in {t1-t0:.2f}s")
        if not candidate_docs:
            print("No candidates found in Stage 1. Aborting.")
            return [], ""
        # Stage 2: High-precision re-ranking
        t2 = time.time()
        final_docs = self.stage2.rerank(query_text, candidate_docs, top_k=stage2_top_k)
        t3 = time.time()
        print(f"Stage 2: Re-ranked to {len(final_docs)} documents in {t3-t2:.2f}s")
        # Construct the final context for the LLM
        context = "\n\n---\n\n".join([doc['text'] for doc in final_docs])
        
        total_time = time.time() - t0
        print(f"--- Pipeline finished in {total_time:.2f}s ---")
        
        return final_docs, context
# --- Example Usage ---
if __name__ == "__main__":
    pipeline = AdvancedRAGPipeline()
    
    # Query 1: A query that benefits from keyword and semantic search
    query1 = "What are the indemnification obligations regarding third-party copyright claims?"
    final_docs1, context1 = pipeline.query(query1)
    
    print(f"\n--- Results for Query 1 ---")
    for doc in final_docs1:
        print(f"ID: {doc['id']}, Score: {doc['rerank_score']:.4f}")
    # print("\nContext for LLM:\n", context1)
    print("\n================================================\n")
    # Query 2: A purely semantic query
    query2 = "What should be done with sensitive materials after a project ends?"
    final_docs2, context2 = pipeline.query(query2)
    print(f"\n--- Results for Query 2 ---")
    for doc in final_docs2:
        print(f"ID: {doc['id']}, Score: {doc['rerank_score']:.4f}")
    # print("\nContext for LLM:\n", context2)
This final script demonstrates the complete flow. Query 1 contains keywords like indemnification and copyright, which BM25 will excel at. Query 2 is more conceptual (sensitive materials, project ends), which the dense vector search and cross-encoder will handle. Our hybrid approach is robust to both.
Performance, Edge Cases, and Production Considerations
This architecture is powerful, but it introduces complexity and performance trade-offs that must be managed in a production environment.
1. Latency vs. Accuracy Tuning
The most critical tuning parameters are the number of documents passed between stages: stage1_top_n and stage2_top_k.
*   stage1_top_n (e.g., 100): This directly impacts the recall of your system and the latency of the re-ranker. A larger N increases the chance of finding the correct document but makes Stage 2 slower. A smaller N is faster but risks missing the needle in the haystack.
*   stage2_top_k (e.g., 5): This determines how much context you pass to the LLM. Too much context can lead to the "lost in the middle" problem where the LLM ignores information buried in a long context. Too little, and you might miss crucial details. This depends heavily on your LLM's context window and instruction-following capabilities.
Benchmarking is non-negotiable. You must create a small, golden dataset of (query, expected_document_id) pairs to evaluate how changes to these parameters affect your retrieval accuracy (e.g., using metrics like Mean Reciprocal Rank (MRR) or NDCG).
2. Cross-Encoder Performance Optimization
Stage 2 is the bottleneck. Here are several strategies to mitigate its latency:
*   Model Choice: Use the smallest cross-encoder that meets your accuracy requirements. ms-marco-MiniLM-L-6-v2 is a good starting point. Larger models like ms-marco-electra-large are more accurate but significantly slower.
* Hardware: Run the re-ranker on a GPU. The performance difference is night and day.
* Quantization & ONNX: For CPU inference, convert the model to a quantized ONNX (Open Neural Network Exchange) format. This can provide a 2-4x speedup with a minimal drop in accuracy.
*   Batching: The sentence-transformers library automatically batches predictions. Ensure your rerank function processes all candidate pairs in a single call to model.predict() to leverage this.
3. Edge Case: Long Documents
The cross-encoder has a maximum sequence length (e.g., 512 tokens). If a document from Stage 1 is longer than this, it will be truncated, potentially losing the relevant information.
Solution: Implement a chunking strategy at the re-ranking stage. For each long document retrieved in Stage 1, split it into overlapping chunks. Create (query, chunk) pairs and pass all of them to the cross-encoder. The final score for the document can be the maximum score of any of its chunks. This adds complexity but is essential for working with long-form text.
4. System Observability
In a production system, you need to log everything:
* The scores from Stage 1 (both BM25 and vector scores).
* The final re-ranking scores from Stage 2.
* The latency of each stage.
* The final documents selected.
This data is invaluable for debugging. When a user reports a bad answer, you can trace the retrieval process back and see if the failure occurred in Stage 1 (poor recall) or Stage 2 (incorrect re-ranking).
Conclusion: Beyond Naive RAG
Moving from a single-stage vector search to a two-stage, re-ranking architecture is a significant step up in complexity, but it is often the deciding factor between a prototype and a production-ready RAG system. By strategically combining fast, recall-oriented methods like BM25 and bi-encoders with slow, precision-oriented cross-encoders, we build a system that is robust, accurate, and capable of handling the nuance of complex information domains.
The key takeaway for senior engineers is that retrieval is not a solved problem to be offloaded to a single library call. It is a tunable, multi-stage process where architectural choices have a direct and profound impact on the quality of the final generated output. The investment in building and tuning this more complex pipeline pays for itself by drastically reducing model hallucinations and delivering more trustworthy, relevant, and accurate answers to your users.