Stateful Multi-Turn RAG with Graph-Based Context Management
The Conversational Dead End: Why Stateless RAG Fails
As senior engineers, we've moved past the initial hype of Retrieval-Augmented Generation (RAG). We've built systems that can query a vector store, retrieve relevant document chunks, and feed them into a Large Language Model (LLM) to produce grounded, factual answers. This pattern is powerful but possesses a critical, production-limiting flaw: it's fundamentally stateless. Each user query is treated as an isolated event, devoid of the rich history that defines a true conversation.
Consider this common dialogue scenario:
Here, the stateless system collapses. The query "What were its primary blockers?" is ambiguous without the preceding turn. A naive RAG system would perform a vector search for the literal string, likely yielding irrelevant results about generic 'blockers'. The pronoun 'its' refers to 'Project Apollo', but the system has no memory of this connection. This is the core problem: contextual amnesia. Standard RAG cannot handle coreference resolution, topic drift, or nuanced follow-up questions.
Attempts to solve this with simple chat history windowing are brittle. They either truncate valuable early context or bloat the prompt with irrelevant chatter, increasing token counts and potentially confusing the LLM. Summarizing the history helps but often loses critical entity-specific details. To build truly intelligent conversational agents, we need a more sophisticated memory structure—one that mirrors how humans connect concepts. We need a graph.
This article presents a production-grade architecture for implementing stateful, multi-turn RAG by modeling the entire conversational context—entities, questions, answers, and their relationships—as a dynamic graph in a database like Neo4j.
The Graph-Based Context Model: A Blueprint for AI Memory
Why a graph? Because conversations are graphs. They are not linear transcripts; they are interconnected networks of concepts. A graph database allows us to model this complexity with precision.
(p:Project {name: 'Apollo'})), user queries ((q:Query)), LLM responses ((a:Answer)), and key concepts.[:ASKS_ABOUT] a project. An answer [:RESPONDS_TO] a query. A project [:HAS_BLOCKER] a specific issue.This structure provides a persistent, queryable memory that grows with the conversation. When the user asks, "What were its primary blockers?", our system can perform a graph traversal starting from the last conversational turn, identify that 'its' refers to the Project node named 'Apollo', and then search for nodes connected via a [:HAS_BLOCKER] relationship.
This approach transforms the RAG process from a stateless lookup into a stateful, context-aware exploration.
System Architecture: A Production-Ready Blueprint
Our stateful RAG system comprises several interconnected modules. Here's a high-level overview before we dive into the implementation details.
graph TD
A[User Input] --> B{Input Handler};
B --> C[Contextualizer Module];
C --> D[Graph DB (Neo4j)];
C --> E[Augmented Query Formulator];
D -- Retrieved Graph Context --> E;
E -- Augmented Query --> F[Vector Store];
F -- Retrieved Documents --> G[Prompt Engineering];
A -- Original Query --> G;
E -- Augmented Query --> G;
G -- Final Prompt --> H[LLM Synthesizer];
H -- LLM Response --> I[Graph Updater];
I -- New Entities/Relationships --> D;
H --> J[Final Output to User];
subgraph Core Stateful Logic
C
D
I
end
Deep Dive: Implementation with Python, LangChain, and Neo4j
Let's build this. We'll use Python, the neo4j driver, langchain for orchestration, and OpenAI's API.
Environment Setup
First, ensure you have a running Neo4j instance (e.g., via Docker or AuraDB) and your environment variables are set.
# .env file
OPENAI_API_KEY="sk-..."
NEO4J_URI="bolt://localhost:7687"
NEO4J_USERNAME="neo4j"
NEO4J_PASSWORD="your_password"
And the required Python packages:
pip install neo4j langchain langchain-openai python-dotenv
Step 1: Neo4j Graph Connection and Schema
We'll start with a graph connection manager. While Neo4j is schema-optional, defining constraints is crucial for performance and data integrity in production.
# graph_manager.py
import os
from neo4j import GraphDatabase
from dotenv import load_dotenv
load_dotenv()
class Neo4jGraphManager:
def __init__(self):
uri = os.getenv("NEO4J_URI")
user = os.getenv("NEO4J_USERNAME")
password = os.getenv("NEO4J_PASSWORD")
self._driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self._driver.close()
def execute_query(self, query, parameters=None):
with self._driver.session() as session:
result = session.run(query, parameters)
return [record for record in result]
def setup_constraints(self):
# Ensures entities and sessions are unique
queries = [
"CREATE CONSTRAINT IF NOT EXISTS FOR (s:Session) REQUIRE s.session_id IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (q:Query) REQUIRE q.id IS UNIQUE"
]
for query in queries:
self.execute_query(query)
print("Graph constraints ensured.")
# Initialize and setup
graph_manager = Neo4jGraphManager()
graph_manager.setup_constraints()
Step 2: The Contextualizer Module - Extracting Entities and Finding Context
This module is responsible for interpreting the user's query in the context of the conversation graph. Its primary job is to extract entities from the new query and then traverse the graph to find related information.
We'll use an LLM for entity extraction, prompting it to return a structured JSON object.
# contextualizer.py
import uuid
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
class ExtractedEntities(BaseModel):
"""Model for entities extracted from a text."""
entities: List[str] = Field(description="A list of named entities, like persons, organizations, or projects.")
class Contextualizer:
def __init__(self, graph_manager, session_id):
self.graph_manager = graph_manager
self.session_id = session_id
self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
self.structured_llm = self.llm.with_structured_output(ExtractedEntities)
def _extract_entities(self, text: str) -> List[str]:
prompt = ChatPromptTemplate.from_messages([
("system", "You are an expert entity extraction system. Your task is to identify and list all named entities from the given text. Focus on specific nouns like project names, people, and organizations. Do not extract generic terms."),
("human", "Extract all named entities from the following text: {text}")
])
chain = prompt | self.structured_llm
response = chain.invoke({"text": text})
return response.entities
def get_context(self, query_text: str) -> dict:
entities = self._extract_entities(query_text)
context = {"entities": entities, "graph_summary": ""}
if not entities:
# If no new entities, find context from the last turn in the session
cypher_query = """
MATCH (s:Session {session_id: $session_id})<-[:IN_SESSION]-(turn:Turn)
WITH turn ORDER BY turn.timestamp DESC LIMIT 1
MATCH (turn)-[:HAS_ENTITY]->(e:Entity)
WITH e
MATCH (e)-[r]-(related_entity)
RETURN e.name AS entity, type(r) as relation, related_entity.name as related
LIMIT 5
"""
params = {"session_id": self.session_id}
else:
# If new entities are found, find context related to them
cypher_query = """
MATCH (e:Entity) WHERE e.name IN $entities
MATCH (e)-[r]-(related_entity)
RETURN e.name AS entity, type(r) as relation, related_entity.name as related
LIMIT 10
"""
params = {"entities": entities}
results = self.graph_manager.execute_query(cypher_query, params)
if results:
summary = "\nConversation Context from Graph:\n"
for record in results:
summary += f"- ({record['entity']})-[{record['relation']}]->({record['related']})\n"
context["graph_summary"] = summary
return context
This class does two things:
_extract_entities: Uses a powerful LLM with structured output capabilities to reliably get a list of entities from the user's text.get_context: Implements the core logic. If the user's query contains new entities, it searches the graph for information related to them. If the query is ambiguous (like "what about it?"), it intelligently defaults to retrieving context from the last conversational turn in the current session.Step 3: Augmenting the Query
Now, we use the retrieved context to rewrite the user's query into a clear, standalone question. This step is crucial for getting accurate results from the vector store.
# query_augmentor.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
class QueryAugmentor:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
def augment(self, original_query: str, graph_context: str) -> str:
if not graph_context:
return original_query # No context, nothing to augment
prompt = ChatPromptTemplate.from_messages([
("system", "You are an expert query rewriting assistant. Your task is to rewrite a potentially ambiguous user query into a clear, self-contained question based on the provided conversation context. If the query is already clear, return it as is."),
("human", "Rewrite the following query using the provided context.\n\n---\nContext:\n{context}\n---\nOriginal Query: {query}\n---\nRewritten Query:")
])
chain = prompt | self.llm
response = chain.invoke({"context": graph_context, "query": original_query})
return response.content
Step 4: The Stateful Loop - Updating the Graph
After the LLM generates a response, we must persist the new information back into our graph memory. This GraphUpdater module will extract entities from both the final query and the LLM's answer and create the necessary nodes and relationships.
# graph_updater.py
import uuid
from datetime import datetime, timezone
class GraphUpdater:
def __init__(self, graph_manager, session_id, entity_extractor):
self.graph_manager = graph_manager
self.session_id = session_id
self.entity_extractor = entity_extractor # Reuse the entity extraction logic
def update_graph(self, query_text: str, augmented_query: str, response_text: str):
turn_id = str(uuid.uuid4())
timestamp = datetime.now(timezone.utc).isoformat()
# Extract entities from the whole turn
turn_text = f"User asked: {query_text}. System responded: {response_text}"
entities = self.entity_extractor(turn_text)
# Cypher query to create the full conversational turn
cypher_query = """
// Find or create the session
MERGE (s:Session {session_id: $session_id})
// Create the turn node
CREATE (t:Turn {
id: $turn_id,
query: $query_text,
augmented_query: $augmented_query,
response: $response_text,
timestamp: datetime($timestamp)
})
// Connect turn to the session
CREATE (t)-[:IN_SESSION]->(s)
// Connect turn to its extracted entities
WITH t
UNWIND $entities AS entity_name
MERGE (e:Entity {name: entity_name})
MERGE (t)-[:HAS_ENTITY]->(e)
"""
params = {
"session_id": self.session_id,
"turn_id": turn_id,
"query_text": query_text,
"augmented_query": augmented_query,
"response_text": response_text,
"timestamp": timestamp,
"entities": entities
}
self.graph_manager.execute_query(cypher_query, params)
print(f"Graph updated for turn {turn_id} with entities: {entities}")
This query is idempotent thanks to MERGE. It safely creates sessions, entities, and turns without duplication, building a robust log of the conversation.
Step 5: Tying It All Together - The `StatefulRAGPipeline`
Now, let's orchestrate these components. For brevity, we'll mock the vector store retrieval part and focus on the stateful logic.
# main.py
import uuid
# Assume previous classes are in these files
from graph_manager import Neo4jGraphManager
from contextualizer import Contextualizer
from query_augmentor import QueryAugmentor
from graph_updater import GraphUpdater
# Mock Vector Store for demonstration
class MockVectorStore:
def search(self, query: str) -> str:
print(f"\n--- MOCK VECTOR SEARCH --- \nQuery: '{query}'")
if "Apollo" in query and "blockers" in query:
return "Document Chunk: A key blocker for Project Apollo was a dependency on the delayed 'Zeus' component and unforeseen supply chain issues."
return "Document Chunk: General project management documentation."
class StatefulRAGPipeline:
def __init__(self, session_id):
self.session_id = session_id
self.graph_manager = Neo4jGraphManager()
self.contextualizer = Contextualizer(self.graph_manager, self.session_id)
self.query_augmentor = QueryAugmentor()
self.graph_updater = GraphUpdater(self.graph_manager, self.session_id, self.contextualizer._extract_entities)
self.vector_store = MockVectorStore()
self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.1)
def run(self, query_text: str):
print(f"\n[INPUT] User Query: '{query_text}'")
# 1. Get context from graph
context = self.contextualizer.get_context(query_text)
print(f"[CONTEXT] Retrieved: {context}")
# 2. Augment the query
augmented_query = self.query_augmentor.augment(query_text, context['graph_summary'])
print(f"[AUGMENT] Rewritten Query: '{augmented_query}'")
# 3. Retrieve from vector store
retrieved_docs = self.vector_store.search(augmented_query)
print(f"[RETRIEVE] Found Docs: '{retrieved_docs}'")
# 4. Synthesize response
prompt = f"""
You are a helpful AI assistant. Answer the user's query based on the provided context and documents.
Context from Conversation History:
{context['graph_summary']}
Retrieved Documents:
{retrieved_docs}
User Query: {query_text}
Answer:
"""
response = self.llm.invoke(prompt)
response_text = response.content
print(f"[SYNTHESIZE] LLM Response: '{response_text}'")
# 5. Update the graph
self.graph_updater.update_graph(query_text, augmented_query, response_text)
return response_text
# --- Example Usage ---
if __name__ == "__main__":
session_id = f"session_{uuid.uuid4()}"
pipeline = StatefulRAGPipeline(session_id)
# Turn 1
pipeline.run("Give me a summary of the Q3 2023 'Project Apollo' performance review.")
# Turn 2
pipeline.run("What were its primary blockers?")
Running this pipeline demonstrates the power of our system:
* Turn 1: The system processes the query, extracts "Project Apollo", and updates the graph, linking this entity to the first turn.
* Turn 2: The user asks, "What were its primary blockers?".
The Contextualizer extracts no new entities. It falls back to querying the graph for the context of the last turn*.
* It finds the (e:Entity {name: 'Project Apollo'}) node connected to the previous turn.
* The QueryAugmentor receives this context and rewrites the query to "What were the primary blockers for Project Apollo?".
* This augmented query is sent to the vector store, which now performs a highly relevant search.
* The final LLM response is accurate and grounded in both the retrieved documents and the conversational history.
* The GraphUpdater adds the new turn and might even extract "Zeus component" as a new entity, linking it to "Project Apollo" with a HAS_BLOCKER relationship if we were to extend the entity extraction.
Performance Considerations and Edge Cases
A system this complex introduces new potential failure modes and performance bottlenecks. A senior engineer must anticipate and mitigate these.
1. Latency Overhead
Each turn now involves multiple LLM calls (extraction, augmentation, synthesis) and at least two database queries (context retrieval, update). This adds latency compared to a stateless RAG call.
Mitigation Strategies:
* Optimized LLMs: Use smaller, faster models like GPT-3.5-Turbo or fine-tuned open-source models for the extraction and augmentation tasks, which are less demanding than final synthesis.
* Asynchronous Updates: The graph update step can be performed asynchronously. The user receives their response as soon as it's synthesized, and the graph update happens in the background. This doesn't reduce total work but significantly improves perceived latency.
Cypher Query Optimization: Use EXPLAIN and PROFILE in Neo4j to analyze query plans. Ensure indexes and constraints are in place on key properties like session_id and entity.name. For large graphs, limiting traversal depth (MATCH (e)-[r1..2]-(related)) is critical.
2. Edge Case: Entity Ambiguity
What happens if the user mentions "John" and the graph contains both "John Doe" and "John Smith" from previous conversations?
Solution: Disambiguation Logic
Before augmenting the query, the Contextualizer can detect ambiguity. If a Cypher query for an entity name returns multiple distinct nodes, the system can initiate a disambiguation step.
# In Contextualizer.get_context
cypher_query = "MATCH (e:Entity) WHERE e.name CONTAINS $entity_name RETURN e.name AS name"
results = self.graph_manager.execute_query(cypher_query, {"entity_name": "John"})
if len(results) > 1:
# Ambiguity detected
options = [r['name'] for r in results]
# Trigger a clarifying question to the user
return {"status": "AMBIGUOUS", "options": options}
The pipeline would then need to handle this state, pause, and ask the user, "Which 'John' are you referring to: John Doe or John Smith?" before proceeding.
3. Edge Case: Context Window Overload
As the graph grows, a simple query could return a massive amount of context, potentially exceeding the LLM's context window or introducing noise.
Solution: Context Summarization and Pruning
* Limit Traversal: Use strict limits in Cypher queries (LIMIT 5).
* Ranked Retrieval: Prioritize context from the most recent turns within the session.
* LLM-based Summarization: If the retrieved graph context (the text summary) is too long, pass it through another quick LLM call to summarize it before injecting it into the final prompt. For example: "Summarize this context in 3 bullet points: ...".
* Graph Pruning: For long-lived sessions, implement a strategy to prune or archive very old Turn nodes to keep the active graph for a session manageable.
Conclusion: From Stateless Tools to Stateful Partners
By replacing a volatile chat history with a persistent, structured graph memory, we elevate our RAG systems from simple Q&A bots to genuine conversational partners. This architecture provides a robust foundation for handling complex, multi-turn dialogues, resolving ambiguity, and building a cumulative understanding of the user's intent over time.
The initial implementation is more complex than a stateless RAG pipeline, but the payoff is an AI that can remember, reason, and interact in a way that feels significantly more intelligent and useful. This pattern is not just a theoretical exercise; it's a necessary evolutionary step for building the next generation of sophisticated AI applications.