Optimizing RAG with Cohere Re-rank and PGVector IVFFlat Indexes
The Semantic Gap in Production RAG Systems
In any mature Retrieval-Augmented Generation (RAG) system, the initial euphoria of functional semantic search quickly gives way to the harsh reality of the "semantic gap." Standard vector search, typically based on cosine similarity or Euclidean distance, is exceptionally good at identifying documents that share statistical correlations in their embedding space. However, it often fails to capture the nuanced, contextual relevance required by a Large Language Model (LLM) to generate a precise and accurate response.
A query for "What were the financial implications of the Q3 server migration?" might retrieve documents about "Q3 financials" and "server migration planning" but miss the critical document titled "Post-Mortem: Q3 Infrastructure Overspend" because its embedding is marginally further away than the more keyword-heavy, yet less insightful, alternatives. This is the core problem: we are optimizing for vector proximity, not true relevance. This leads to a classic precision/recall trade-off. A simple vector search that retrieves a large number of candidates (top_k=100) might have high recall (the correct document is likely in the set) but suffers from abysmal precision (most of the 100 documents are noise).
To bridge this gap in a production environment, we must evolve from a single-stage retrieval process to a multi-stage pipeline that separates the concerns of broad candidate sourcing (recall) from precise relevance filtering (precision). This is the foundation of the Retrieve-and-Re-rank architecture.
The Two-Stage Retrieval Architecture: Retrieve and Re-rank
This pattern decomposes the retrieval problem into two distinct phases:
pgvector and a specifically tuned IVFFlat index for this stage.This architecture allows us to use the right tool for each job: a scalable and fast Approximate Nearest Neighbor (ANN) index for the initial wide net, and a powerful, context-aware neural model for the final, precise selection.
graph TD
A[User Query] --> B{Embedding Model};
B --> C[L1 Retrieval: PGVector IVFFlat];
C -- Top-K Candidates (e.g., K=100) --> D{L2 Re-ranking: Cohere API};
A --> D;
D -- Top-N Relevant Docs (e.g., N=5) --> E{LLM for Generation};
A --> E;
E --> F[Final Answer];
subgraph Legend
direction LR
subgraph L1 - Recall
C
end
subgraph L2 - Precision
D
end
end
Deep Dive: Configuring PGVector with IVFFlat Indexes
While HNSW (Hierarchical Navigable Small World) is a popular choice for pgvector indexes due to its excellent performance balance, IVFFlat (Inverted File with Flat Compression) offers compelling advantages for massive, largely static datasets where we can afford a longer index build time in exchange for tunable query performance.
IVFFlat works by partitioning the vector space into a predefined number of clusters, or lists. During a query, instead of searching the entire space, the system identifies the clusters closest to the query vector and searches only within them. This is controlled by the probes parameter.
Why IVFFlat over HNSW for this Architecture?
probes parameter gives us explicit, granular control over the trade-off between speed and recall. For our L1 stage, we can choose a lower probes value, accepting slightly lower recall for significantly lower latency, knowing the L2 re-ranker will compensate.probes, making latency more predictable.Implementation and Parameter Tuning
Let's assume a table for our documents. The embedding dimension is 768, from a model like sentence-transformers/all-mpnet-base-v2.
-- Ensure the pgvector extension is enabled
CREATE EXTENSION IF NOT EXISTS vector;
-- Document table schema
CREATE TABLE documents (
id BIGSERIAL PRIMARY KEY,
content TEXT NOT NULL,
metadata JSONB,
embedding VECTOR(768)
);
-- Example of inserting a document
-- INSERT INTO documents (content, metadata, embedding) VALUES (...);
The crucial step is creating the IVFFlat index. The most important parameter is lists.
Choosing the lists Parameter:
The number of lists determines how many partitions the vector space is divided into. A good starting point is:
lists = sqrt(num_rows)lists = num_rows / 1000Let's assume we have 500,000 documents. A reasonable lists value would be sqrt(500000) ≈ 707.
-- Create the IVFFlat index
-- This can take a significant amount of time on a large table.
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops) WITH (lists = 707);
Tuning the probes Query Parameter:
The probes parameter is not set at index creation; it's set at query time. It dictates how many of the nearby lists (clusters) are searched. This is our primary lever for balancing latency and recall.
We can set this parameter for our session before running the query:
-- Set the number of probes for the current session
SET ivfflat.probes = 10;
-- Now execute the query
SELECT id, content
FROM documents
ORDER BY embedding <=> '[...your_query_vector...]' -- The <=> operator performs the distance search
LIMIT 100;
Benchmarking `probes` for Production
To choose the right probes value, you must benchmark. The goal is to find the "knee" in the curve—the point where increasing probes yields diminishing returns in recall for a significant latency cost.
Here is a Python script demonstrating how to perform this benchmark. It requires a ground truth dataset where you know the truly relevant document IDs for a set of test queries.
import psycopg2
import numpy as np
import time
import os
# --- Configuration ---
DB_CONN_STRING = os.getenv("DB_URL")
TEST_QUERIES = { # A map of test queries to their ground truth document IDs
"query_vector_1": [101, 205, 308],
"query_vector_2": [415, 522, 631, 749]
}
PROBE_VALUES = [1, 2, 4, 8, 16, 32]
TOP_K_RETRIEVAL = 100
def get_embedding(query_text): # Placeholder for your embedding function
# In a real scenario, this would call your embedding model
return np.random.rand(768).tolist()
def run_benchmark():
results = {}
conn = psycopg2.connect(DB_CONN_STRING)
for probes in PROBE_VALUES:
latencies = []
recalls = []
with conn.cursor() as cur:
cur.execute(f"SET LOCAL ivfflat.probes = {probes};")
print(f"\n--- Benchmarking with probes = {probes} ---")
for query_text, true_ids in TEST_QUERIES.items():
query_vector = get_embedding(query_text)
start_time = time.perf_counter()
cur.execute(
"SELECT id FROM documents ORDER BY embedding <=> %s LIMIT %s",
(str(query_vector), TOP_K_RETRIEVAL)
)
retrieved_ids = {row[0] for row in cur.fetchall()}
end_time = time.perf_counter()
latencies.append((end_time - start_time) * 1000) # milliseconds
# Calculate recall@K
hits = len(retrieved_ids.intersection(set(true_ids)))
recall_at_k = hits / len(true_ids) if true_ids else 0
recalls.append(recall_at_k)
avg_latency = np.mean(latencies)
avg_recall = np.mean(recalls)
results[probes] = {"latency_ms": avg_latency, "recall": avg_recall}
print(f"Avg Latency: {avg_latency:.2f} ms")
print(f"Avg Recall@{TOP_K_RETRIEVAL}: {avg_recall:.4f}")
conn.close()
return results
if __name__ == "__main__":
benchmark_results = run_benchmark()
print("\n--- Final Results ---")
print("| Probes | Avg Latency (ms) | Recall@100 |")
print("|--------|------------------|------------|")
for p, r in benchmark_results.items():
print(f"| {p:<6} | {r['latency_ms']:<16.2f} | {r['recall']:<10.4f} |")
Hypothetical Benchmark Results:
| Probes | Avg Latency (ms) | Recall@100 |
|---|---|---|
| 1 | 8.51 | 0.8214 |
| 2 | 12.33 | 0.8929 |
| 4 | 19.87 | 0.9643 |
| 8 | 35.12 | 0.9821 |
| 16 | 65.45 | 0.9821 |
| 32 | 121.90 | 0.9821 |
From this data, probes = 8 is the clear winner. It achieves nearly maximum recall with a manageable latency of ~35ms. Increasing to 16 nearly doubles the latency for no improvement in recall. This is the data-driven decision-making required for production systems.
Implementing the Cohere Re-ranker Layer
With our fast L1 retriever tuned, we now feed its output—a list of 100 candidate documents—to our L2 re-ranker. The Cohere Re-rank model is a cross-encoder. Unlike a bi-encoder (which creates embeddings independently), a cross-encoder processes the (query, document) pair together, allowing it to capture far more subtle and complex relevance signals.
The API Call and Implementation
The implementation is straightforward. We take the raw text of the documents retrieved from PostgreSQL and pass them to the Cohere API.
import cohere
class Reranker:
def __init__(self, api_key):
self.co = cohere.Client(api_key)
def rerank_documents(self, query: str, documents: list[dict], top_n: int = 5) -> list[dict]:
"""
Reranks a list of documents based on a query.
Args:
query (str): The user's query.
documents (list[dict]): A list of dictionaries, each with 'id' and 'content'.
top_n (int): The number of top documents to return.
Returns:
list[dict]: A sorted list of the top_n most relevant documents.
"""
if not documents:
return []
# Extract the content for the API call
doc_contents = [doc['content'] for doc in documents]
try:
# The model 'rerank-english-v2.0' is optimized for performance and quality.
response = self.co.rerank(
model='rerank-english-v2.0',
query=query,
documents=doc_contents,
top_n=top_n
)
# Map the reranked results back to our original document objects
reranked_indices = [r.index for r in response.results]
reranked_docs = [documents[i] for i in reranked_indices]
# Optionally, you can attach the relevance score
for i, result in enumerate(response.results):
reranked_docs[i]['relevance_score'] = result.relevance_score
return reranked_docs
except cohere.errors.CohereAPIError as e:
# Production-grade error handling is crucial
print(f"Cohere API error: {e}")
# Fallback strategy: return the original top_n documents from L1 retrieval
return documents[:top_n]
# Example Usage:
# cohere_reranker = Reranker(api_key=os.getenv("COHERE_API_KEY"))
# retrieved_docs = [...] # List of {'id': 1, 'content': '...'} from PGVector
# final_docs = cohere_reranker.rerank_documents("my query", retrieved_docs, top_n=5)
This implementation includes a critical fallback mechanism. If the Cohere API fails, we don't fail the entire request; we gracefully degrade by returning the top N documents from the initial L1 retrieval. This ensures system resilience.
End-to-End Production Pipeline
Now, let's assemble the full pipeline into a cohesive service class. This class will manage the database connection, embedding generation, L1 retrieval, and L2 re-ranking.
import psycopg2
from psycopg2.extras import RealDictCursor
import cohere
from sentence_transformers import SentenceTransformer
import os
class AdvancedRAGPipeline:
def __init__(self, db_conn_string, cohere_api_key, embedding_model_name='all-mpnet-base-v2'):
self.db_conn = psycopg2.connect(db_conn_string)
self.cohere_client = cohere.Client(cohere_api_key)
self.embedding_model = SentenceTransformer(embedding_model_name)
self.ivfflat_probes = 8 # Determined from our benchmark
def _embed(self, text: str) -> list[float]:
return self.embedding_model.encode(text).tolist()
def _retrieve_candidates(self, query_vector: list[float], top_k: int) -> list[dict]:
with self.db_conn.cursor(cursor_factory=RealDictCursor) as cur:
# Use a transaction to set probes locally
cur.execute("BEGIN;")
cur.execute(f"SET LOCAL ivfflat.probes = {self.ivfflat_probes};")
cur.execute(
"SELECT id, content FROM documents ORDER BY embedding <=> %s LIMIT %s",
(str(query_vector), top_k)
)
candidates = cur.fetchall()
cur.execute("COMMIT;")
return candidates
def _rerank(self, query: str, documents: list[dict], top_n: int) -> list[dict]:
if not documents:
return []
doc_contents = [doc['content'] for doc in documents]
try:
response = self.cohere_client.rerank(
model='rerank-english-v2.0',
query=query,
documents=doc_contents,
top_n=top_n
)
reranked_docs = [documents[r.index] for r in response.results]
for i, result in enumerate(response.results):
reranked_docs[i]['relevance_score'] = result.relevance_score
return reranked_docs
except cohere.errors.CohereAPIError as e:
print(f"Cohere API error during rerank: {e}. Falling back to L1 results.")
return documents[:top_n]
def execute(self, query: str, retrieve_k: int = 100, rerank_n: int = 5) -> list[dict]:
"""Executes the full Retrieve-and-Re-rank pipeline."""
if not query:
return []
# 1. Embed the query
query_vector = self._embed(query)
# 2. L1 Retrieval
candidate_docs = self._retrieve_candidates(query_vector, retrieve_k)
if not candidate_docs:
return []
# 3. L2 Re-ranking
final_docs = self._rerank(query, candidate_docs, rerank_n)
return final_docs
# --- Main execution block ---
if __name__ == '__main__':
pipeline = AdvancedRAGPipeline(
db_conn_string=os.getenv("DB_URL"),
cohere_api_key=os.getenv("COHERE_API_KEY")
)
user_query = "What were the financial implications of the Q3 server migration?"
final_documents = pipeline.execute(user_query)
print(f"Final documents for query: '{user_query}'")
for doc in final_documents:
score = doc.get('relevance_score', 'N/A')
print(f" - ID: {doc['id']}, Score: {score:.4f}, Content: {doc['content'][:100]}...")
Edge Case Handling and Production Considerations
top_k of 100, Cohere's API typically responds in 200-400ms. If this is too slow, consider reducing the number of candidates (retrieve_k=50). This is another trade-off: a smaller candidate set for the re-ranker is faster and cheaper but increases the risk that the truly relevant document was never passed to it.relevance_score is below a certain threshold (e.g., 0.5). This prevents the LLM from being fed low-quality context, even if it made it into the top N.Performance and Cost Analysis
Let's break down the expected performance of our end-to-end system:
| Stage | Typical Latency (ms) | Notes |
|---|---|---|
| 1. Query Embedding | 30-50 | Depends on model and hardware (GPU vs CPU). |
| 2. PGVector L1 Retrieval | 30-40 | Based on our benchmark with probes=8 for top_k=100. |
| 3. Cohere L2 Re-ranking | 200-400 | For 100 documents. This is the main contributor to latency. |
| Total Retrieval Time | 260-490ms | Before passing context to the LLM for generation. |
This sub-500ms retrieval time is generally acceptable for interactive applications, especially given the immense quality gains. The cost is also a factor. If PGVector costs are part of your existing database infrastructure, the main new operational cost is the Cohere API, which should be factored into the per-query cost model of your application.
Conclusion
By moving from a naive single-stage vector search to a sophisticated two-stage Retrieve-and-Re-rank architecture, we solve the critical problem of the semantic gap in production RAG systems. This pattern allows us to leverage the strengths of different technologies: the raw speed and scalability of PGVector with a tuned IVFFlat index for high-recall candidate sourcing, and the deep contextual understanding of a cross-encoder model like Cohere Re-rank for high-precision final selection.
The key takeaways for senior engineers implementing this pattern are:
probes value. Measure the latency/recall trade-off to make a data-informed decision that fits your application's specific needs.This architecture represents a significant step up in maturity for any RAG system, moving it from a promising prototype to a reliable, production-grade service capable of delivering consistently relevant and accurate results.