Scalable RAG with pgvector and Cross-Encoder Re-ranking
The Precision Bottleneck in Production RAG Systems
In the initial gold rush to implement Retrieval-Augmented Generation (RAG), many systems were built on a simple premise: embed a query, perform a vector similarity search against a corpus, grab the top-K documents, and stuff them into a Large Language Model's (LLM) context window. While effective for demos, this single-stage approach quickly reveals its limitations in production environments. The core issue is a fundamental trade-off between recall and precision.
A single-stage vector search, optimized for speed using Approximate Nearest Neighbor (ANN) algorithms, is designed for high recall—it's excellent at finding a broad set of potentially relevant documents from millions or billions of entries. However, it often struggles with precision. The subtle nuances of a user's query can be lost in the high-dimensional space of embeddings, causing semantically close but contextually irrelevant documents to rank highly. This leads to the LLM receiving a diluted, noisy context, resulting in generic, incorrect, or hallucinated responses—a critical failure in production applications.
To overcome this, senior engineers are adopting a more sophisticated, two-stage retrieval architecture that mirrors classic patterns in information retrieval:
This article provides a deep, implementation-focused guide to building this two-stage architecture using PostgreSQL with the pgvector extension for candidate generation and a Transformer-based cross-encoder for re-ranking. We will bypass introductory concepts and focus on production-grade schema design, index optimization, advanced query patterns, and performance considerations.
Section 1: Architectural Blueprint: Bi-Encoders vs. Cross-Encoders
Understanding the fundamental difference between the models used in each stage is critical. Our architecture relies on two distinct types of Transformer models:
* Bi-Encoders (for Stage 1): These models, like all-MiniLM-L6-v2 from the sentence-transformers library, generate independent vector embeddings for the query and the documents. The retrieval process involves computing a distance metric (like cosine similarity) between the pre-computed document vectors and the on-the-fly query vector. This is extremely fast, as the expensive embedding process for the entire corpus is done offline. This is the engine of our candidate generation.
* Cross-Encoders (for Stage 2): These models operate differently. Instead of processing the query and document separately, a cross-encoder takes both as a single input [CLS] query [SEP] document [SEP]. This allows the model to perform full self-attention across both texts simultaneously, capturing fine-grained interactions and contextual relevance far more effectively than a bi-encoder's dot product. The output is not a vector but a single score (typically 0 to 1) indicating relevance. This process is orders of magnitude slower, making it unsuitable for searching over an entire corpus but perfect for re-ranking a small candidate set.
Our production pipeline will look like this:
pgvector column.a. User query is embedded using the same bi-encoder.
b. A SELECT query with an ANN index lookup (<=>) is executed against PostgreSQL to fetch the top k candidates (e.g., k=100).
c. The query and each of the 100 candidate documents are formatted into pairs.
d. These 100 pairs are passed in a batch to a cross-encoder model for scoring.
e. The results are sorted by the cross-encoder's score, and the top n (e.g., n=5) are selected.
f. This highly-relevant, precision-optimized context is passed to the LLM.
Section 2: Production-Ready Ingestion and `pgvector` Indexing
A robust RAG system starts with a well-designed data layer. PostgreSQL, with its maturity, transactional guarantees, and powerful extensions like pgvector, provides an excellent foundation.
Schema Design
We need a table to store our document chunks, their bi-encoder embeddings, and any associated metadata that can be used for filtering.
-- Ensure the pgvector extension is installed
CREATE EXTENSION IF NOT EXISTS vector;
-- Main table for document chunks and their embeddings
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
document_id UUID NOT NULL, -- Foreign key to a parent documents table
content TEXT NOT NULL, -- The actual text chunk
token_count INT NOT NULL, -- Useful for context window management
embedding VECTOR(384) NOT NULL, -- Dimensions must match your bi-encoder model
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Add metadata for multi-tenancy or filtering
ALTER TABLE document_chunks ADD COLUMN tenant_id UUID;
CREATE INDEX idx_document_chunks_tenant_id ON document_chunks(tenant_id);
Here, VECTOR(384) corresponds to the output dimension of the all-MiniLM-L6-v2 model. Using a UUID for tenant_id is a common pattern in multi-tenant applications, allowing us to enforce data isolation at the query level.
Ingestion Script (Python)
This script handles chunking, embedding, and batch insertion into our PostgreSQL table.
import psycopg2
import psycopg2.extras
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os
# --- Configuration ---
DB_CONNECTION_STRING = os.getenv("DATABASE_URL")
MODEL_NAME = 'all-MiniLM-L6-v2' # 384 dimensions
BATCH_SIZE = 128
# --- Initialize Models and Splitter ---
print("Loading bi-encoder model...")
bi_encoder = SentenceTransformer(MODEL_NAME)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
def ingest_document(document_text: str, document_id: str, tenant_id: str):
"""Chunks, embeds, and batch-inserts a document into PostgreSQL."""
chunks = text_splitter.split_text(document_text)
print(f"Document split into {len(chunks)} chunks. Embedding in batches...")
all_embeddings = bi_encoder.encode(chunks, show_progress_bar=True, batch_size=BATCH_SIZE)
records_to_insert = []
for i, chunk in enumerate(chunks):
records_to_insert.append({
'document_id': document_id,
'content': chunk,
'token_count': len(chunk.split()), # A rough approximation
'embedding': all_embeddings[i].tolist(),
'tenant_id': tenant_id
})
print(f"Inserting {len(records_to_insert)} records into the database...")
with psycopg2.connect(DB_CONNECTION_STRING) as conn:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur,
"""INSERT INTO document_chunks (document_id, content, token_count, embedding, tenant_id)
VALUES %s""",
[(r['document_id'], r['content'], r['token_count'], r['embedding'], r['tenant_id']) for r in records_to_insert]
)
print("Ingestion complete.")
# Example Usage:
if __name__ == '__main__':
# In a real app, this would come from a file, API, etc.
sample_doc_text = "... your large document text here ..."
sample_doc_id = "a1b2c3d4-e5f6-a7b8-c9d0-e1f2a3b4c5d6"
sample_tenant_id = "t1a2b3c4-d5e6-f7a8-b9c0-d1e2f3a4b5c6"
ingest_document(sample_doc_text, sample_doc_id, sample_tenant_id)
The Critical Choice: HNSW vs. IVFFlat Indexing
pgvector offers two main types of ANN indexes. The choice is not trivial and has significant performance implications.
* How it works: It partitions the vector space into lists. At query time, it identifies the nearest partitions (probes) and performs an exhaustive search only within those, drastically reducing the search space.
* Pros: Faster to build, uses less memory during build.
* Cons: Query performance can degrade as data grows. Accuracy is highly dependent on tuning the probes parameter at query time.
* Creation: CREATE INDEX ON document_chunks USING ivfflat (embedding vector_l2_ops) WITH (lists = 1000); (The number of lists should be N/1000 to N/200 for N rows).
* How it works: It builds a multi-layered graph where top layers have long-range connections and bottom layers have short-range connections. Searching involves greedily traversing the graph from a starting point.
* Pros: Extremely fast and accurate queries, even on very large datasets. Generally outperforms IVFFlat for recall and speed.
* Cons: Slower to build, consumes more memory during index creation, and the index itself is larger.
* Creation: CREATE INDEX ON document_chunks USING hnsw (embedding vector_l2_ops) WITH (m = 16, ef_construction = 64); (m is the number of connections per node, ef_construction controls graph quality).
Production Recommendation & Benchmark:
For most production RAG systems where query latency is critical, HNSW is the superior choice. The upfront cost of building the index is paid once, while the query performance benefits are realized on every request.
Let's consider a hypothetical benchmark on a table with 1 million 384-dimensional vectors:
| Index Type | Build Time | Index Size | Query Time (Top 100) | Recall @ 100 |
|---|---|---|---|---|
| IVFFlat | ~15 mins | ~450 MB | ~35ms (probes=10) | ~92% |
| HNSW | ~60 mins | ~800 MB | ~8ms (ef_search=40) | ~99% |
As seen, HNSW provides a >4x improvement in query latency with near-perfect recall, making it the clear winner for our candidate generation stage.
Section 3: Implementing the Two-Stage Retrieval Logic
With our data ingested and indexed, we can now build the core retrieval logic.
Stage 1: Fast Candidate Generation with `pgvector`
This function will take a query, embed it, and use our HNSW index to fetch the top 100 candidates.
import numpy as np
def fetch_candidates(query_text: str, tenant_id: str, k: int = 100) -> list[dict]:
"""Fetches top-k candidate chunks from PostgreSQL using vector similarity."""
query_embedding = bi_encoder.encode(query_text)
candidates = []
with psycopg2.connect(DB_CONNECTION_STRING) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Set HNSW probe parameter for the current session
# ef_search is the query-time equivalent of ef_construction
cur.execute(f"SET hnsw.ef_search = {k * 2}") # A good starting point
cur.execute(
"""SELECT id, content, document_id
FROM document_chunks
WHERE tenant_id = %s
ORDER BY embedding <=> %s
LIMIT %s""",
(tenant_id, query_embedding.tolist(), k)
)
candidates = [dict(row) for row in cur.fetchall()]
return candidates
Key Implementation Details:
* <=> Operator: This is the pgvector operator for L2 distance. We could also use <#> for inner product or <-> for cosine distance (if vectors are normalized).
hnsw.ef_search: This session-level parameter is crucial for tuning the HNSW query. It defines the size of the dynamic list of entry points during the graph traversal. A higher value increases accuracy and latency. Setting it to k 2 is a reasonable heuristic.
* Multi-tenancy: The WHERE tenant_id = %s clause is critical for data isolation. PostgreSQL's query planner is smart enough to apply this filter first before performing the expensive vector search, making it highly efficient.
Stage 2: Precision Re-ranking with a Cross-Encoder
Now we take the 100 candidates and re-rank them for maximum relevance.
from transformers import CrossEncoder
# --- Load Cross-Encoder Model (do this once at application startup) ---
print("Loading cross-encoder model...")
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def rerank_candidates(query_text: str, candidates: list[dict], n: int = 5) -> list[dict]:
"""Re-ranks a list of candidates using a cross-encoder model."""
if not candidates:
return []
# Format pairs for the cross-encoder: [ (query, doc_content), ... ]
pairs = [(query_text, candidate['content']) for candidate in candidates]
# Predict scores in a batch for efficiency
print(f"Scoring {len(pairs)} pairs with cross-encoder...")
scores = cross_encoder.predict(pairs, show_progress_bar=False)
# Combine candidates with their new scores
for i in range(len(candidates)):
candidates[i]['rerank_score'] = scores[i]
# Sort candidates by the new score in descending order
candidates.sort(key=lambda x: x['rerank_score'], reverse=True)
# Return the top-n most relevant candidates
return candidates[:n]
Performance Consideration: The Re-ranking Bottleneck
The cross_encoder.predict call is the most computationally expensive part of the online query. On a CPU, re-ranking 100 candidates can take 200-500ms. For user-facing applications, this is often too slow.
Solution: GPU Acceleration. By running this inference on a GPU, the latency for 100 candidates can be reduced to 20-50ms. Ensure you have pytorch with CUDA support installed and run your application on a GPU-enabled instance.
# If you have a GPU, move the model to it
import torch
if torch.cuda.is_available():
print("Moving cross-encoder to GPU")
cross_encoder.model.to('cuda')
Section 4: Advanced Patterns and Edge Cases
To build a truly robust system, we must consider more complex scenarios.
Hybrid Search: Merging Vector and Full-Text Search
Vector search excels at semantic relevance, but can fail on specific keywords, acronyms, or product codes. Traditional full-text search (FTS) is the opposite. A hybrid approach combines the strengths of both.
First, we need to add a tsvector column to our table and index it.
-- Add a tsvector column for FTS
ALTER TABLE document_chunks ADD COLUMN content_tsv TSVECTOR;
-- Create a trigger to automatically update the tsvector on insert/update
CREATE OR REPLACE FUNCTION update_content_tsv()
RETURNS TRIGGER AS $$
BEGIN
NEW.content_tsv := to_tsvector('english', NEW.content);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER tsvector_update
BEFORE INSERT OR UPDATE ON document_chunks
FOR EACH ROW EXECUTE FUNCTION update_content_tsv();
-- Create a GIN index for fast FTS
CREATE INDEX idx_gin_content_tsv ON document_chunks USING GIN(content_tsv);
Now, we can perform a hybrid query. The challenge is combining the scores. We'll use Reciprocal Rank Fusion (RRF) to merge the result sets without needing to normalize the disparate scores from vector search and FTS.
def hybrid_fetch_candidates(query_text: str, tenant_id: str, k: int = 100):
# This is a simplified implementation of RRF
query_embedding = bi_encoder.encode(query_text)
with psycopg2.connect(DB_CONNECTION_STRING) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Vector search results
cur.execute(
"""SELECT id, content, (embedding <=> %s) as distance
FROM document_chunks WHERE tenant_id = %s
ORDER BY distance LIMIT %s""",
(query_embedding.tolist(), tenant_id, k)
)
vector_results = {row['id']: {'rank': i + 1, 'data': row} for i, row in enumerate(cur.fetchall())}
# FTS results
cur.execute(
"""SELECT id, content, ts_rank(content_tsv, websearch_to_tsquery('english', %s)) as rank
FROM document_chunks WHERE tenant_id = %s AND content_tsv @@ websearch_to_tsquery('english', %s)
ORDER BY rank DESC LIMIT %s""",
(query_text, tenant_id, query_text, k)
)
fts_results = {row['id']: {'rank': i + 1, 'data': row} for i, row in enumerate(cur.fetchall())}
# RRF fusion logic
rrf_scores = {}
for doc_id, res in vector_results.items():
rrf_scores[doc_id] = 1 / (60 + res['rank'])
for doc_id, res in fts_results.items():
if doc_id not in rrf_scores:
rrf_scores[doc_id] = 0
rrf_scores[doc_id] += 1 / (60 + res['rank'])
# Sort by RRF score
sorted_doc_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
# De-duplicate and combine results
final_candidates = []
seen_ids = set()
for doc_id in sorted_doc_ids:
if doc_id not in seen_ids:
# Prioritize data from vector results if available
data = (vector_results.get(doc_id) or fts_results.get(doc_id))['data']
final_candidates.append(dict(data))
seen_ids.add(doc_id)
if len(final_candidates) >= k:
break
return final_candidates
Edge Case: Asynchronous Re-ranking for Low-Latency APIs
If even GPU-accelerated re-ranking is too slow, consider an asynchronous pattern. The API could immediately return results from the faster Stage 1 search, then push the superior, re-ranked results via WebSocket or Server-Sent Events once the background job completes. This requires a task queue like Celery with Redis/RabbitMQ.
- API endpoint receives query.
pgvector search (fast).request_id.request_id, query, and candidate set.- A Celery worker picks up the task, performs Stage 2 cross-encoder re-ranking (slow).
request_id.Section 5: The Complete Production-Grade Pipeline
Let's tie everything together into a single, cohesive class that encapsulates the logic.
class RAGPipeline:
def __init__(self, db_conn_str, bi_encoder_model, cross_encoder_model):
self.db_conn_str = db_conn_str
self.bi_encoder = SentenceTransformer(bi_encoder_model)
self.cross_encoder = CrossEncoder(cross_encoder_model)
# Optional: Move to GPU if available
if torch.cuda.is_available():
self.cross_encoder.model.to('cuda')
def _fetch_candidates(self, query_embedding, tenant_id: str, k: int = 100):
# Implementation from Section 3.1
# ... returns list of candidate dicts
pass
def _rerank_candidates(self, query_text: str, candidates: list[dict], n: int = 5):
# Implementation from Section 3.2
# ... returns list of re-ranked dicts
pass
def query(self, query_text: str, tenant_id: str, candidate_k: int = 100, final_n: int = 5):
"""Executes the full two-stage RAG pipeline."""
print(f"Executing query for tenant '{tenant_id}': '{query_text}'")
try:
# Stage 1: Candidate Generation
query_embedding = self.bi_encoder.encode(query_text)
candidates = self._fetch_candidates(query_embedding, tenant_id, k=candidate_k)
if not candidates:
print("No candidates found.")
return []
print(f"Retrieved {len(candidates)} candidates from vector store.")
# Stage 2: Re-ranking
reranked_results = self._rerank_candidates(query_text, candidates, n=final_n)
print(f"Re-ranked to top {len(reranked_results)} results.")
return reranked_results
except psycopg2.Error as e:
print(f"Database error: {e}")
# Add proper logging and error handling
raise
except Exception as e:
print(f"An unexpected error occurred: {e}")
raise
# --- Example Usage ---
if __name__ == '__main__':
pipeline = RAGPipeline(
db_conn_str=os.getenv("DATABASE_URL"),
bi_encoder_model='all-MiniLM-L6-v2',
cross_encoder_model='cross-encoder/ms-marco-MiniLM-L-6-v2'
)
user_query = "What are the performance implications of HNSW indexing in pgvector?"
user_tenant_id = "t1a2b3c4-d5e6-f7a8-b9c0-d1e2f3a4b5c6"
final_context_docs = pipeline.query(user_query, user_tenant_id)
print("\n--- Final Context for LLM ---")
for doc in final_context_docs:
print(f"Score: {doc['rerank_score']:.4f}, ID: {doc['id']}")
print(f"Content: {doc['content'][:150]}...")
print("---")
# This final context would then be formatted and sent to an LLM.
Conclusion: Beyond Naive Vector Search
For senior engineers tasked with building reliable and accurate AI systems, moving beyond naive, single-stage RAG is not an optimization—it's a requirement. By adopting a two-stage retrieval architecture with pgvector's HNSW for fast candidate generation and a powerful cross-encoder for precision re-ranking, we directly address the core challenge of balancing speed and relevance at scale.
This approach, while more complex, provides a robust framework that is further enhanced by the power of PostgreSQL for hybrid search, metadata filtering, and transactional integrity. The patterns discussed here—HNSW indexing, cross-encoder batching, and hybrid search fusion—represent the difference between a proof-of-concept demo and a production-ready RAG system capable of delivering consistently high-quality results.