Advanced RAG: Multi-hop Reasoning & Self-Correction Loops
Beyond Naive RAG: Tackling Complex Queries in Production
Standard Retrieval-Augmented Generation (RAG) has become the de facto architecture for grounding Large Language Models (LLMs) in factual, private data. The pattern is simple and effective: retrieve relevant text chunks from a vector database and prepend them as context to the user's query. For simple fact-lookup questions like "What was our Q2 revenue?", this works remarkably well.
However, in production environments, users rarely ask such simple questions. They ask complex, multi-faceted queries that require synthesis and reasoning across multiple documents and concepts. Consider a query like:
"Compare the key findings of our internal user study on Project Titan with the market trends identified in the latest Gartner report, and explain how our Q3 roadmap addresses the discrepancies."
A naive RAG system will fail spectacularly here. A single vector search on the entire query will likely retrieve a jumble of irrelevant snippets from all three sources (the user study, the Gartner report, the roadmap), leading to a generic, non-committal, or outright hallucinatory answer. The core problem is that this query isn't a single question; it's a multi-step reasoning task.
To solve this, we must evolve our RAG systems from simple retrieval mechanisms into more sophisticated agentic reasoning pipelines. This post dives deep into two powerful, production-ready patterns that enable this evolution:
This is not a theoretical overview. We will build and dissect complete Python implementations, focusing on the architectural trade-offs, performance challenges, and observability patterns required to run these systems reliably in production.
Pattern 1: Multi-hop Reasoning with Knowledge Graphs
The fundamental limitation of vector search is its flat, semantic-only view of the world. It knows that chunk A and chunk B are about similar things, but it has no explicit understanding of the relationship between them (e.g., "Company X, mentioned in chunk A, acquired Company Y, mentioned in chunk B"). This is where Knowledge Graphs (KGs) excel.
By representing our documents as a graph of entities (nodes) and relationships (edges), we enable an LLM agent to perform multi-hop reasoning. It can start at one point in the graph and intelligently traverse relationships to gather the precise context needed to answer a complex query.
Implementation: From Documents to a Queryable Graph
Our strategy involves a hybrid approach: we'll continue using a vector database for initial semantic search but augment it with a graph database (Neo4j in this example) for structured, relational traversal.
Step 1: Graph Construction - Entity and Relationship Extraction
First, we need to parse our documents and extract structured information. We'll use an LLM for this task, prompting it to identify entities and their relationships and output them in a structured format.
Let's assume we have a set of documents about corporate filings and news.
import os
from typing import List, Dict, Any
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
# Ensure you have OPENAI_API_KEY in your environment variables
# os.environ["OPENAI_API_KEY"] = "your_api_key"
llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0)
# Pydantic model for structured output
class GraphNode(BaseModel):
"""Represents a node in the knowledge graph."""
id: str = Field(description="Unique identifier for the node, typically the entity name.")
type: str = Field(description="The type of the entity (e.g., Company, Person, Product).")
properties: Dict[str, Any] = Field(description="Additional properties of the node.")
class GraphRelationship(BaseModel):
"""Represents a relationship between two nodes in the knowledge graph."""
source: str = Field(description="The ID of the source node.")
target: str = Field(description="The ID of the target node.")
type: str = Field(description="The type of the relationship (e.g., ACQUIRED, PARTNERED_WITH, LAUNCHED).")
properties: Dict[str, Any] = Field(description="Additional properties of the relationship.")
class KnowledgeGraph(BaseModel):
"""Represents the extracted knowledge graph from a text chunk."""
nodes: List[GraphNode] = Field(description="List of nodes in the graph.")
relationships: List[GraphRelationship] = Field(description="List of relationships in the graph.")
# Create a structured LLM chain for extraction
structured_llm = llm.with_structured_output(KnowledgeGraph)
# Prompt for extraction
extraction_prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert in information extraction. Your task is to identify entities and relationships from the provided text and structure them as a knowledge graph.
Extract entities such as Companies, Products, and key financial metrics.
Extract relationships like 'ACQUIRED', 'PARTNERED_WITH', 'LAUNCHED', 'REPORTED'.
Only extract information explicitly mentioned in the text."""
),
("human", "Here is the text chunk:\n\n---\n{text_chunk}\n---"),
])
extraction_chain = extraction_prompt | structured_llm
# Example Usage
text_chunk_1 = "In Q3 2023, InnovateCorp announced the acquisition of DataWeave Inc. for $1.2 billion. This move strengthens InnovateCorp's position in the data analytics market. The deal was finalized on October 15, 2023."
text_chunk_2 = "Following their acquisition by InnovateCorp, DataWeave Inc. launched a new product, 'QuantumLeap Analytics', which leverages InnovateCorp's existing cloud infrastructure."
kg1 = extraction_chain.invoke({"text_chunk": text_chunk_1})
kg2 = extraction_chain.invoke({"text_chunk": text_chunk_2})
print("--- Extracted from Chunk 1 ---")
print(kg1.json(indent=2))
print("\n--- Extracted from Chunk 2 ---")
print(kg2.json(indent=2))
Step 2: Ingesting into a Graph Database (Neo4j)
With the extracted structured data, we can now populate our Neo4j database. We'll use the neo4j
Python driver.
from neo4j import GraphDatabase
# Ensure you have a running Neo4j instance (e.g., via Docker)
# docker run --rm -p 7474:7474 -p 7687:7687 -e NEO4J_AUTH=neo4j/password neo4j:latest
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "password"
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
def ingest_knowledge_graph(driver, kg: KnowledgeGraph):
with driver.session() as session:
for node in kg.nodes:
session.run("""MERGE (n:`{type}` {{id: $id}})
SET n += $properties""",
type=node.type, id=node.id, properties=node.properties)
for rel in kg.relationships:
session.run("""MATCH (source {{id: $source_id}}), (target {{id: $target_id}})
MERGE (source)-[r:`{type}`]->(target)
SET r += $properties""",
source_id=rel.source, target_id=rel.target, type=rel.type, properties=rel.properties)
# Ingest the extracted graphs
ingest_knowledge_graph(driver, kg1)
ingest_knowledge_graph(driver, kg2)
driver.close()
print("\nKnowledge graphs ingested into Neo4j.")
After running this, you can inspect your Neo4j browser and see the connected graph: (InnovateCorp)-[:ACQUIRED]->(DataWeave Inc.)-[:LAUNCHED]->(QuantumLeap Analytics)
.
Querying the Graph: The Multi-hop Agent
Now for the core of the pattern. We'll create an agent that can decompose a complex question and decide whether to perform a semantic search (on a vector DB) or a structured graph traversal (on Neo4j).
This agent will use an LLM as its reasoning engine, equipped with two "tools":
vector_search_tool
: For broad, semantic questions.graph_traversal_tool
: For precise questions about relationships and connections.from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.tools import tool
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
# --- 1. Setup Vector Store (for hybrid search) ---
all_chunks = [text_chunk_1, text_chunk_2]
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=20)
docs = text_splitter.create_documents(all_chunks)
embeddings = OpenAIEmbeddings()
vector_store = FAISS.from_documents(docs, embeddings)
retriever = vector_store.as_retriever()
# --- 2. Define Tools for the Agent ---
@tool
def vector_search_tool(query: str) -> str:
"""Use this tool for semantic search to find general information or details within a document."""
print(f"\n>>> Executing Vector Search Tool with query: {query}")
retrieved_docs = retriever.invoke(query)
return "\n".join([doc.page_content for doc in retrieved_docs])
@tool
def graph_traversal_tool(query: str) -> List[Dict[str, Any]]:
"""Use this tool to answer questions about relationships, connections, and multi-step processes.
The input should be a Cypher query for Neo4j.
Example: 'MATCH (c:Company)-[:ACQUIRED]->(t:Company) RETURN c.id, t.id'
"""
print(f"\n>>> Executing Graph Traversal Tool with Cypher: {query}")
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
with driver.session() as session:
result = session.run(query)
return [record.data() for record in result]
driver.close()
# --- 3. Create the Agent ---
tools = [vector_search_tool, graph_traversal_tool]
agent_prompt = ChatPromptTemplate.from_messages([
("system", """You are a powerful reasoning agent designed to answer complex queries about corporate data.
You have two tools at your disposal:
1. `vector_search_tool`: For finding specific details or context within documents.
2. `graph_traversal_tool`: For understanding the relationships between entities.
Follow these steps:
1. Decompose the user's complex query into a series of smaller, logical questions.
2. For each sub-question, decide which tool is most appropriate.
3. If using the graph tool, formulate a precise Cypher query.
4. Synthesize the results from your tool usage into a comprehensive final answer.
5. If a query requires multiple steps, explain your reasoning process.
"""
),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent_llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0)
agent = create_openai_tools_agent(agent_llm, tools, agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# --- 4. Run a Complex Query ---
complex_query = "What new product was launched by the company that InnovateCorp acquired, and what was the rationale for the acquisition?"
response = agent_executor.invoke({"input": complex_query})
print("\n--- Final Answer ---")
print(response["output"])
When you run this, observe the verbose=True
output. You'll see the LLM's chain-of-thought:
graph_traversal_tool
with a Cypher query like MATCH (c:Company {id: 'InnovateCorp'})-[:ACQUIRED]->(t:Company) RETURN t.id
. The tool returns [{'t.id': 'DataWeave Inc.'}]
.graph_traversal_tool
again with MATCH (c:Company {id: 'DataWeave Inc.'})-[:LAUNCHED]->(p:Product) RETURN p.id
. The tool returns [{'p.id': 'QuantumLeap Analytics'}]
.vector_search_tool
with a query like "rationale for InnovateCorp acquisition of DataWeave". The tool returns the source text chunk.Production Considerations for Multi-hop RAG
* Scalability: Graph extraction is computationally expensive. For large document sets, run this as an offline batch process. Use streaming pipelines (e.g., Kafka + Flink) for real-time updates.
* Graph Maintenance: How do you handle updates or deletions? Implement a content-addressing scheme for your text chunks. When a chunk is updated, re-run extraction and use MERGE
and DETACH DELETE
Cypher commands to update the graph atomically.
* Error Handling & Observability: The agent's reasoning can be brittle. Log the entire agent_scratchpad
for every query. This trace (thoughts, tool calls, tool outputs) is invaluable for debugging why an agent made a particular decision. Tools like LangSmith are built for this.
* Cost vs. Complexity: Maintaining a KG is a significant architectural overhead. This pattern is justified only when your domain is rich with relationships and users frequently ask complex, relational questions.
Pattern 2: Self-Correction Loops with Reflection Agents
Even with perfect context retrieval, an LLM can still produce a suboptimal answer. It might misinterpret a nuance, fail to synthesize information correctly, or produce an answer that isn't structured in the desired format. The Self-Correction (or Reflection) pattern addresses this by making the generation process iterative.
The core idea is to use an LLM to critique and improve its own output. This creates a feedback loop that progressively enhances the quality of the final answer.
Architecture: The Generate-Critique-Refine Cycle
Our reflection agent will follow a simple but powerful loop:
Implementation: A `SelfCorrectingAgent`
Let's build a Python class that encapsulates this logic.
import json
class SelfCorrectingAgent:
def __init__(self, llm, max_iterations=3):
self.llm = llm
self.max_iterations = max_iterations
# Prompt for initial answer generation
self.generator_prompt = ChatPromptTemplate.from_messages([
("system", "You are an expert financial analyst. Answer the user's query based *only* on the provided context. Cite the source document for every claim you make. Format your answer clearly."),
("human", "Query: {query}\n\nContext:\n---\n{context}\n---"),
])
# Pydantic model for critique
class Critique(BaseModel):
is_sufficient: bool = Field(description="Is the answer sufficient and factually correct based on the context?")
feedback: str = Field(description="Specific, constructive feedback for improving the answer. If sufficient, say 'No feedback needed'.")
structured_critique_llm = llm.with_structured_output(Critique)
# Prompt for the critique step
self.critique_prompt = ChatPromptTemplate.from_messages([
("system", "You are a meticulous editor. Your role is to critique a generated answer based on the provided context and query. Check for the following criteria:\n1. **Faithfulness**: Does the answer make any claims not supported by the context?\n2. **Completeness**: Does the answer address all parts of the user's query?\n3. **Clarity**: Is the answer easy to understand?\nProvide feedback for improvement."),
("human", "Query: {query}\n\nContext:\n---\n{context}\n---\n\nGenerated Answer:\n---\n{draft_answer}\n---"),
])
self.critique_chain = self.critique_prompt | structured_critique_llm
# Prompt for the refinement step
self.refiner_prompt = ChatPromptTemplate.from_messages([
("system", "You are an expert financial analyst. Your previous answer was found to have issues. Please refine it based on the provided critique. Answer the original query using the context, incorporating the feedback."),
("human", "Original Query: {query}\n\nContext:\n---\n{context}\n---\n\nCritique Feedback:\n---\n{feedback}\n---\n"),
])
self.generator_chain = self.generator_prompt | self.llm
self.refiner_chain = self.refiner_prompt | self.llm
def run(self, query: str, context: str) -> str:
print("--- Starting Self-Correction Loop ---")
# 1. Initial Generation
print("\nIteration 1: Generating initial draft...")
draft_answer = self.generator_chain.invoke({"query": query, "context": context}).content
print(f"Draft 1: {draft_answer}")
for i in range(self.max_iterations):
# 2. Critique
print(f"\nIteration {i+1}: Critiquing draft...")
critique_result = self.critique_chain.invoke({
"query": query,
"context": context,
"draft_answer": draft_answer
})
print(f"Critique: Is sufficient? {critique_result.is_sufficient}, Feedback: {critique_result.feedback}")
if critique_result.is_sufficient:
print("\nAnswer is sufficient. Finalizing.")
return draft_answer
# 3. Refine
print(f"\nIteration {i+1}: Refining draft based on feedback...")
refined_answer = self.refiner_chain.invoke({
"query": query,
"context": context,
"feedback": critique_result.feedback
}).content
draft_answer = refined_answer
print(f"Draft {i+2}: {draft_answer}")
print("\nMax iterations reached. Returning final draft.")
return draft_answer
# --- Example Usage ---
# Let's use a tricky context that might lead to a mistake
query = "What were the main financial results for InnovateCorp in Q3 2023, and what was the key driver?"
context = "Document A: InnovateCorp's Q3 2023 report shows a revenue of $50M and a net profit of $5M. The report mentions the acquisition of DataWeave Inc. as a major event.\nDocument B: An analyst report states that the data analytics market grew by 20% in Q3, partly due to new AI regulations. InnovateCorp's acquisition of DataWeave strengthened its market position."
agent_llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0.2)
self_correcting_agent = SelfCorrectingAgent(llm=agent_llm, max_iterations=2)
final_answer = self_correcting_agent.run(query=query, context=context)
print("\n--- FINAL AGENT ANSWER ---")
print(final_answer)
Expected Execution Flow:
InnovateCorp had $50M in revenue and $5M in profit in Q3 2023. A key event was the acquisition of DataWeave Inc.
This is factually correct but misses the "key driver" part of the query.is_sufficient: false
, feedback: The answer correctly states the financial results but does not explain the key driver behind them. It mentions the acquisition but doesn't connect it to market trends or performance as a driver.
In Q3 2023, InnovateCorp reported a revenue of $50M and a net profit of $5M (Document A). A key driver for their performance and strengthened market position was the strategic acquisition of DataWeave Inc., which allowed them to capitalize on the 20% growth in the data analytics market (Document B).
Edge Cases and Performance Considerations
* Convergence Failure: The agent can get stuck in a loop, making minor changes that don't satisfy the critique. The max_iterations
guard is crucial to prevent infinite loops and runaway costs.
Cost: This pattern is expensive. Each loop involves at least two LLM calls. A 3-iteration cycle is 1 initial call + 2 (critique + refine) = 5 calls. Use this pattern judiciously for high-value queries where accuracy is paramount.
* Latency: The sequential nature of the loop significantly increases response time. This is often unacceptable for real-time user-facing applications. A common production pattern is to use this as an asynchronous refinement process. Show the user the initial draft immediately, and provide an option to see an "enhanced" or "verified" answer later, which is generated by the self-correction pipeline in the background.
* Prompt Engineering: The quality of the critique and refinement prompts is the single most important factor for success. They must be specific, clear, and provide strong guidance to the LLM. Experiment extensively with these prompts.
The Unified Architecture: Multi-hop Reasoning Meets Self-Correction
The true power of these patterns is realized when they are combined. A state-of-the-art RAG system uses multi-hop reasoning to gather a comprehensive and precise context, and then feeds that context into a self-correction loop to generate a robust, faithful, and well-structured final answer.
System Flow:
SelfCorrectingAgent
, which iterates until it produces a high-quality, verified answer.This architecture is not simple, but it's a reflection of the complexity required to move from basic chatbots to true AI-powered reasoning systems. It trades simplicity for power, enabling you to tackle a class of problems that is simply out of reach for naive RAG.
Conclusion: RAG is a Reasoning Framework, Not Just Retrieval
The key takeaway for senior engineers is to stop thinking of RAG as a simple retrieve-then-prompt
mechanism. We must treat it as a flexible framework for building sophisticated reasoning pipelines.
By incorporating structured data representations like knowledge graphs and iterative refinement processes like self-correction, we elevate our LLM applications from mere information recall systems to powerful tools for synthesis and analysis. The implementation overhead is non-trivial, demanding careful consideration of cost, latency, and observability. However, for applications where the accuracy and depth of the generated answers are critical business drivers, these advanced patterns are no longer an academic curiosity—they are a production necessity.