RAG Beyond Vectors: Multi-Hop QA with LLMs and Graph Databases
The Hard Ceiling of Vector-Based RAG
As senior engineers, we've moved past the novelty of Retrieval-Augmented Generation (RAG). We've built systems that chunk documents, embed them using state-of-the-art models, and retrieve them with cosine similarity from vector stores like Pinecone or Weaviate. This pattern is effective for questions where the answer lies within a single, self-contained document chunk. For example, "What were the key findings of the Q3 performance report?"
The architectural ceiling appears when we face questions that require reasoning across multiple documents and, more importantly, across the implicit relationships between entities described in those documents. Consider this query from a project management system:
"What are the active projects for engineers who report directly to the manager of the 'Phoenix' team?"
A standard vector-based RAG system will likely fail, and not subtly. Here's the failure cascade:
REPORTS_TO, MEMBER_OF, WORKS_ON), cannot reliably traverse the relationship chain: Phoenix Team -> Manager -> Direct Reports -> Projects. It will likely hallucinate an answer or state that it cannot find the information.This isn't a flaw in the LLM; it's a fundamental limitation of the retrieval mechanism. We're asking a reasoning engine to solve a graph traversal problem with a flat list of text snippets. The solution is to upgrade our retrieval mechanism to one that natively understands relationships: a Knowledge Graph.
This article details the architecture and implementation of a GraphRAG system. We'll use Neo4j as our graph database and Python for orchestration, focusing on the production patterns required to make this approach robust and scalable.
Section 1: Modeling Your Domain as a Knowledge Graph
Before we write a single line of RAG code, we must model our domain. A graph consists of Nodes (entities) and Relationships (connections). For our example, a sensible model would be:
* Nodes:
* Person(name: string, employee_id: string, title: string)
* Team(name: string, department: string)
* Project(name: string, status: string, start_date: date)
* Relationships:
* [:MEMBER_OF] (Person -> Team)
* [:MANAGES] (Person -> Team)
* [:REPORTS_TO] (Person -> Person)
* [:ASSIGNED_TO] (Person -> Project)
Ingesting Unstructured Data into the Graph
In a real-world scenario, this data is rarely in a clean, structured format. It's locked in HR documents, project wikis, and Slack messages. A powerful pattern is to use an LLM for entity and relationship extraction to populate the graph.
Here’s a production-oriented Python script using OpenAI's function calling feature to parse text and structure it for graph ingestion. This is far more reliable than simple prompt-based extraction.
import os
import json
from openai import OpenAI
from neo4j import GraphDatabase
from pydantic import BaseModel, Field
from typing import List, Optional
# --- Pydantic Models for Structured Extraction ---
class Person(BaseModel):
name: str = Field(description="Full name of the person")
title: str = Field(description="Job title of the person")
class Relationship(BaseModel):
source: str = Field(description="The name of the source person")
target: str = Field(description="The name of the target person or team")
type: str = Field(description="Type of relationship, e.g., 'REPORTS_TO' or 'MANAGES'")
class ExtractedGraph(BaseModel):
people: List[Person]
relationships: List[Relationship]
# --- Neo4j Connection ---
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
class Neo4jGraph:
def __init__(self, uri, user, password):
self._driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self._driver.close()
def execute_query(self, query, parameters=None):
with self._driver.session() as session:
result = session.run(query, parameters)
return [record for record in result]
def ingest_graph_data(self, graph_data: ExtractedGraph):
with self._driver.session() as session:
# Use MERGE to avoid creating duplicate nodes (idempotency)
for person in graph_data.people:
session.run("MERGE (p:Person {name: $name}) SET p.title = $title",
name=person.name, title=person.title)
for rel in graph_data.relationships:
if rel.type == 'REPORTS_TO':
session.run("""
MATCH (subordinate:Person {name: $sub_name})
MATCH (manager:Person {name: $mgr_name})
MERGE (subordinate)-[:REPORTS_TO]->(manager)
""", sub_name=rel.source, mgr_name=rel.target)
elif rel.type == 'MANAGES':
# Assuming teams are created separately or extracted as well
session.run("""
MERGE (t:Team {name: $team_name})
MATCH (manager:Person {name: $mgr_name})
MERGE (manager)-[:MANAGES]->(t)
""", team_name=rel.target, mgr_name=rel.source)
print(f"Ingested {len(graph_data.people)} people and {len(graph_data.relationships)} relationships.")
# --- LLM Extraction Logic ---
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def extract_graph_from_text(text: str) -> ExtractedGraph:
response = client.chat.completions.create(
model="gpt-4-1106-preview",
messages=[
{
"role": "system",
"content": "You are an expert HR data analyst. Extract entities and relationships from the provided text and structure them according to the provided JSON schema. Identify people, their titles, and their reporting structure."
},
{
"role": "user",
"content": text
}
],
tools=[
{
"type": "function",
"function": {
"name": "load_graph_data",
"description": "Load the extracted graph data",
"parameters": ExtractedGraph.model_json_schema()
}
}
],
tool_choice={"type": "function", "function": {"name": "load_graph_data"}}
)
tool_call = response.choices[0].message.tool_calls[0]
if tool_call.function.name == "load_graph_data":
args = json.loads(tool_call.function.arguments)
return ExtractedGraph(**args)
else:
raise ValueError("LLM did not call the expected function.")
# --- Main Execution ---
if __name__ == '__main__':
# Example unstructured text from an HR document
hr_document = """
Org Update: Q4 2023
The Engineering department is led by Sarah Connor. The 'Phoenix' team, managed by John Doe, falls under her.
Direct reports to John Doe include Alice Smith (Senior Engineer) and Bob Johnson (Lead Engineer).
Charlie Brown (Staff Engineer) reports directly to Sarah Connor.
"""
# 1. Extract structured data using LLM
try:
extracted_data = extract_graph_from_text(hr_document)
print("--- Extracted Data ---")
print(extracted_data.model_dump_json(indent=2))
# 2. Ingest into Neo4j
graph_db = Neo4jGraph(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
graph_db.ingest_graph_data(extracted_data)
graph_db.close()
print("\n--- Ingestion Complete ---")
except Exception as e:
print(f"An error occurred: {e}")
This ingestion pattern is robust because:
MERGE in Cypher ensures that running the ingestion multiple times won't create duplicate entities, which is critical for data pipelines.Section 2: The Core Pattern: Natural Language to Graph Query
With our data in the graph, the core of the GraphRAG system is a chain that translates a user's natural language question into a formal graph query language (Cypher for Neo4j).
This is a challenging task that requires careful prompt engineering. The LLM needs to understand:
- The graph schema (nodes, relationships, properties).
- The syntax of the query language (Cypher).
- The user's intent.
Here is an implementation of a Text2Cypher chain.
# (Continuing from previous code, assuming imports are present)
class Text2CypherConverter:
def __init__(self, graph: Neo4jGraph, llm_client: OpenAI):
self.graph = graph
self.llm_client = llm_client
self.schema = self._get_graph_schema()
def _get_graph_schema(self) -> str:
# In a real system, this should be cached.
# This query introspects the graph to get the schema.
query = """
CALL db.schema.visualization()
"""
# The result of this is complex; we'll use a simplified, hardcoded schema for the prompt.
# For a production system, you would parse the output of the above query.
return """
Node properties are the following:
- Person {name: STRING, title: STRING}
- Team {name: STRING}
- Project {name: STRING, status: STRING}
Relationship properties are the following:
- REPORTS_TO {}
- MANAGES {}
- MEMBER_OF {}
- ASSIGNED_TO {}
The relationships are as follows:
(Person)-[:REPORTS_TO]->(Person)
(Person)-[:MANAGES]->(Team)
(Person)-[:MEMBER_OF]->(Team)
(Person)-[:ASSIGNED_TO]->(Project)
"""
def convert(self, question: str, retries: int = 2) -> str:
prompt = f"""
You are a Neo4j expert. Your task is to convert a natural language question into a Cypher query based on the provided graph schema.
Only return the Cypher query, with no explanations or preamble.
Schema:
{self.schema}
Question:
{question}
Cypher Query:
"""
for attempt in range(retries + 1):
try:
response = self.llm_client.chat.completions.create(
model="gpt-4-turbo-preview",
messages=[
{"role": "system", "content": "You are a Neo4j Cypher query generation expert."},
{"role": "user", "content": prompt}
],
temperature=0.0
)
generated_query = response.choices[0].message.content.strip()
# A simple heuristic to remove markdown code fences
if generated_query.startswith("```cypher"):
generated_query = generated_query[9:-3].strip()
if generated_query.startswith("```"):
generated_query = generated_query[3:-3].strip()
# --- CRITICAL: Query Validation Step ---
# We use EXPLAIN to validate syntax without executing the query.
# This prevents malicious or malformed queries from running.
self.graph.execute_query(f"EXPLAIN {generated_query}")
return generated_query
except Exception as e:
print(f"Attempt {attempt + 1}: Generated query failed validation. Error: {e}")
if attempt >= retries:
raise ValueError("Failed to generate a valid Cypher query after multiple attempts.") from e
# On failure, we could potentially feed the error back to the LLM for self-correction.
raise ValueError("Should not be reached.")
# --- Example Usage ---
if __name__ == '__main__':
# Assume graph_db and OpenAI client are initialized
graph_db = Neo4jGraph(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Add some project data for our complex query
graph_db.execute_query("""
MERGE (p1:Project {name: 'Project Apollo', status: 'Active'})
MERGE (p2:Project {name: 'Project Titan', status: 'On Hold'})
MERGE (p3:Project {name: 'Project Zeus', status: 'Active'})
MATCH (a:Person {name: 'Alice Smith'})
MATCH (b:Person {name: 'Bob Johnson'})
MERGE (a)-[:ASSIGNED_TO]->(p1)
MERGE (a)-[:ASSIGNED_TO]->(p2)
MERGE (b)-[:ASSIGNED_TO]->(p3)
""")
converter = Text2CypherConverter(graph_db, client)
question = "What are the active projects for engineers who report directly to the manager of the 'Phoenix' team?"
try:
cypher_query = converter.convert(question)
print("--- Generated Cypher Query ---")
print(cypher_query)
# Now, execute the query to get the context
results = graph_db.execute_query(cypher_query)
print("\n--- Query Results ---")
for record in results:
print(record)
except ValueError as e:
print(e)
finally:
graph_db.close()
The LLM should generate a query like this:
MATCH (team:Team {name: 'Phoenix'})<-[:MANAGES]-(manager:Person),
(manager)<-[:REPORTS_TO]-(engineer:Person),
(engineer)-[:ASSIGNED_TO]->(project:Project {status: 'Active'})
RETURN project.name AS projectName
This generated query is the key. It traverses the exact relationships needed to answer the question, something impossible for vector search. The EXPLAIN validation step is a critical security and stability measure. Never execute LLM-generated database queries directly in production without a validation or sanitization step.
Section 3: The Full GraphRAG Chain: From Question to Answer
Now we assemble the pieces into a complete RAG pipeline.
# (Continuing from previous code)
def serialize_results_to_context(results: list) -> str:
"""Serializes the Neo4j query results into a simple string format."""
if not results:
return "No information found in the knowledge graph."
# Assuming the result is a list of records with one column
# A more complex serializer would handle multiple columns and data types.
return "Based on the knowledge graph, here is the relevant information:\n" + ", ".join([record[0] for record in results])
class GraphRAGSystem:
def __init__(self, converter: Text2CypherConverter, graph: Neo4jGraph, llm_client: OpenAI):
self.converter = converter
self.graph = graph
self.llm_client = llm_client
def answer(self, question: str) -> str:
# 1. Generate Cypher Query
try:
cypher_query = self.converter.convert(question)
print(f"[DEBUG] Generated Cypher: {cypher_query}")
except ValueError as e:
print(f"[ERROR] Could not generate valid Cypher: {e}")
# Fallback strategy: could try a vector search or return a canned response
return "I was unable to translate your question into a valid query for our knowledge base."
# 2. Execute Query and Retrieve Context
try:
results = self.graph.execute_query(cypher_query)
print(f"[DEBUG] Query Results: {results}")
except Exception as e:
print(f"[ERROR] Error executing Cypher query: {e}")
return "I encountered an error while querying our knowledge base."
# 3. Serialize Context
context = serialize_results_to_context(results)
print(f"[DEBUG] Serialized Context: {context}")
# 4. Synthesize Answer
final_prompt = f"""
You are a helpful assistant. Answer the user's question based on the provided context from our knowledge graph.
Be concise and directly answer the question.
Context:
{context}
Question:
{question}
Answer:
"""
response = self.llm_client.chat.completions.create(
model="gpt-4-0613",
messages=[
{"role": "user", "content": final_prompt}
],
temperature=0.0
)
return response.choices[0].message.content
# --- Full Pipeline Execution ---
if __name__ == '__main__':
graph_db = Neo4jGraph(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
converter = Text2CypherConverter(graph_db, client)
rag_system = GraphRAGSystem(converter, graph_db, client)
question = "What are the active projects for engineers who report directly to the manager of the 'Phoenix' team?"
answer = rag_system.answer(question)
print("\n--- Final Answer ---")
print(answer)
graph_db.close()
Expected Output:
[DEBUG] Generated Cypher: MATCH (team:Team {name: 'Phoenix'})<-[:MANAGES]-(manager:Person), (manager)<-[:REPORTS_TO]-(engineer:Person), (engineer)-[:ASSIGNED_TO]->(project:Project {status: 'Active'}) RETURN project.name AS projectName
[DEBUG] Query Results: [<Record projectName='Project Apollo'>, <Record projectName='Project Zeus'>]
[DEBUG] Serialized Context: Based on the knowledge graph, here is the relevant information:
Project Apollo, Project Zeus
--- Final Answer ---
The active projects for engineers who report to the manager of the 'Phoenix' team are Project Apollo and Project Zeus.
This successful answer is a direct result of the graph traversal providing precise, filtered context to the final synthesis step.
Section 4: Advanced Considerations and Production Hardening
A proof-of-concept is one thing; a production system is another. Here are critical edge cases and optimizations to consider.
1. Handling Cypher Generation Failures: Self-Correction
The Text2CypherConverter has a simple retry loop. A more advanced pattern is self-correction, where the query error is fed back into the LLM prompt.
Modify the convert method's except block:
# Inside Text2CypherConverter.convert method
... a try block ...
except Exception as e:
print(f"Attempt {attempt + 1}: Generated query failed validation. Error: {e}")
if attempt >= retries:
raise ValueError("Failed to generate a valid Cypher query after multiple attempts.") from e
# Self-correction loop
correction_prompt = f"""
The previously generated Cypher query failed with the following error:
{e}
Please correct the query based on this error. Remember the schema:
{self.schema}
Original Question: {question}
Corrected Cypher Query:
"""
# This becomes the new prompt for the next iteration
prompt = correction_prompt
This creates a feedback loop, allowing the LLM to learn from its mistakes within the scope of a single request, significantly improving the reliability of query generation.
2. Performance: Caching and Query Optimization
* Schema Caching: The graph schema rarely changes. Cache it in memory (e.g., using functools.lru_cache) instead of querying Neo4j on every call.
* Query Caching: Many user questions are semantically identical. Implement a caching layer (like Redis) that stores a hash of the normalized question and its corresponding valid Cypher query. This can dramatically reduce LLM calls and latency.
* Graph Indexes: Ensure your graph is properly indexed for performance. For our schema, creating indexes on Person(name) and Team(name) is essential.
CREATE INDEX person_name_index FOR (p:Person) ON (p.name);
CREATE INDEX team_name_index FOR (t:Team) ON (t.name);
3. Context Window Management: Handling Large Subgraphs
What if a query returns thousands of nodes? MATCH (p:Person)-[:ASSIGNED_TO]->(proj:Project) RETURN p.name, proj.name could return a massive result set, overflowing the LLM's context window.
Strategy 1: Pagination and Summarization
Modify the generated Cypher to use LIMIT. If the number of results exceeds a threshold, instead of returning the raw data, return a summary.
# In the GraphRAGSystem.answer method
cypher_query_with_limit = f"{cypher_query} LIMIT 100"
results = self.graph.execute_query(cypher_query_with_limit)
if len(results) == 100:
# Result set is large, generate a summary instead of listing items
summary_query = f"WITH {cypher_query} AS subquery RETURN count(*) AS total_count"
count_result = self.graph.execute_query(summary_query)
total_count = count_result[0]['total_count']
context = f"The query returned a large number of results ({total_count} items). Please ask the user to refine their question."
else:
context = serialize_results_to_context(results)
Strategy 2: Path-Based Summarization
Instead of returning individual nodes, return paths. The Cypher query can be modified to return a path object, which can then be serialized more concisely.
MATCH p = (t:Team {name: 'Phoenix'})<-...-(project:Project) RETURN p
The serialization logic would then describe the path: "Team Phoenix is managed by John Doe, who manages Alice Smith, who is assigned to Project Apollo."
4. The Hybrid Approach: Vector + Graph
The most powerful systems don't choose between vector search and graph traversal; they use both. Some questions are better suited for one over the other.
* Semantic questions on node properties: "Find me projects with descriptions similar to 'machine learning infrastructure'." -> Vector Search on the Project.description property.
* Relational, multi-hop questions: "Which teams work on projects managed by people who report to Sarah?" -> Graph Traversal.
An advanced architecture involves a Router Chain—an initial LLM call that, given the user's question and descriptions of the available tools (vector search, graph search), decides which tool to use.
{
"tool_name": "graph_search",
"reasoning": "The user's question involves multiple hops and relationships (teams, projects, managers, reports), which is best handled by a graph traversal."
}
This router then dispatches the query to the appropriate RAG pipeline. This meta-layer of reasoning allows your system to dynamically select the optimal retrieval strategy for any given question.
Conclusion: The Next Frontier for RAG
By integrating knowledge graphs into our RAG systems, we break through the ceiling of simple semantic search. We empower LLMs not just to find information but to reason over it by providing context that preserves the rich, interconnected structure of our data. The GraphRAG pattern, while more complex to implement than its vector-based counterpart, unlocks the ability to answer a whole new class of complex, multi-hop questions that were previously intractable.
The future of enterprise-grade AI assistants lies in this fusion of unstructured language understanding and structured knowledge representation. As senior engineers, mastering these hybrid retrieval patterns is no longer an academic exercise—it is a prerequisite for building the next generation of truly intelligent applications.