Stateful Multi-Turn RAG with Graph-Based Context Management

21 min read
Goh Ling Yong
Technology enthusiast and software architect specializing in AI-driven development tools and modern software engineering practices. Passionate about the intersection of artificial intelligence and human creativity in building tomorrow's digital solutions.

The Conversational Dead End: Why Stateless RAG Fails

As senior engineers, we've moved past the initial hype of Retrieval-Augmented Generation (RAG). We've built systems that can query a vector store, retrieve relevant document chunks, and feed them into a Large Language Model (LLM) to produce grounded, factual answers. This pattern is powerful but possesses a critical, production-limiting flaw: it's fundamentally stateless. Each user query is treated as an isolated event, devoid of the rich history that defines a true conversation.

Consider this common dialogue scenario:

  • User: "Give me a summary of the Q3 2023 'Project Apollo' performance review."
  • Stateless RAG: Retrieves 'apollo_q3_review.pdf', synthesizes a summary. -> "Project Apollo met 95% of its KPIs in Q3 2023, with key successes in module integration..."
  • User: "What were its primary blockers?"
  • Here, the stateless system collapses. The query "What were its primary blockers?" is ambiguous without the preceding turn. A naive RAG system would perform a vector search for the literal string, likely yielding irrelevant results about generic 'blockers'. The pronoun 'its' refers to 'Project Apollo', but the system has no memory of this connection. This is the core problem: contextual amnesia. Standard RAG cannot handle coreference resolution, topic drift, or nuanced follow-up questions.

    Attempts to solve this with simple chat history windowing are brittle. They either truncate valuable early context or bloat the prompt with irrelevant chatter, increasing token counts and potentially confusing the LLM. Summarizing the history helps but often loses critical entity-specific details. To build truly intelligent conversational agents, we need a more sophisticated memory structure—one that mirrors how humans connect concepts. We need a graph.

    This article presents a production-grade architecture for implementing stateful, multi-turn RAG by modeling the entire conversational context—entities, questions, answers, and their relationships—as a dynamic graph in a database like Neo4j.

    The Graph-Based Context Model: A Blueprint for AI Memory

    Why a graph? Because conversations are graphs. They are not linear transcripts; they are interconnected networks of concepts. A graph database allows us to model this complexity with precision.

  • Nodes: Represent entities (e.g., (p:Project {name: 'Apollo'})), user queries ((q:Query)), LLM responses ((a:Answer)), and key concepts.
  • Relationships: Define the context. A query [:ASKS_ABOUT] a project. An answer [:RESPONDS_TO] a query. A project [:HAS_BLOCKER] a specific issue.
  • This structure provides a persistent, queryable memory that grows with the conversation. When the user asks, "What were its primary blockers?", our system can perform a graph traversal starting from the last conversational turn, identify that 'its' refers to the Project node named 'Apollo', and then search for nodes connected via a [:HAS_BLOCKER] relationship.

    This approach transforms the RAG process from a stateless lookup into a stateful, context-aware exploration.

    System Architecture: A Production-Ready Blueprint

    Our stateful RAG system comprises several interconnected modules. Here's a high-level overview before we dive into the implementation details.

    mermaid
    graph TD
        A[User Input] --> B{Input Handler};
        B --> C[Contextualizer Module];
        C --> D[Graph DB (Neo4j)];
        C --> E[Augmented Query Formulator];
        D -- Retrieved Graph Context --> E;
        E -- Augmented Query --> F[Vector Store];
        F -- Retrieved Documents --> G[Prompt Engineering];
        A -- Original Query --> G;
        E -- Augmented Query --> G;
        G -- Final Prompt --> H[LLM Synthesizer];
        H -- LLM Response --> I[Graph Updater];
        I -- New Entities/Relationships --> D;
        H --> J[Final Output to User];
    
        subgraph Core Stateful Logic
            C
            D
            I
        end
  • Input Handler: Receives the raw user query and the session ID.
  • Contextualizer Module: The brain of our system. It queries the graph database to understand the context of the current query based on the conversation history.
  • Graph DB (Neo4j): Stores the conversational graph. Each session has its own subgraph of queries, answers, and extracted entities.
  • Augmented Query Formulator: Rewrites the user's ambiguous query (e.g., "What were its blockers?") into a specific, self-contained query (e.g., "What were the primary blockers for Project Apollo in Q3 2023?") using the context from the graph.
  • Vector Store: The traditional RAG component. It now receives the augmented query for a much more precise vector search.
  • Prompt Engineering: Constructs the final prompt for the LLM, combining the original query, the augmented query, the retrieved graph context, and the retrieved documents.
  • LLM Synthesizer: The LLM that generates the final, context-aware response.
  • Graph Updater: A crucial step. After the response is generated, this module parses the latest query and response, extracts new entities and relationships, and updates the graph. This closes the stateful loop, enriching the memory for the next turn.
  • Deep Dive: Implementation with Python, LangChain, and Neo4j

    Let's build this. We'll use Python, the neo4j driver, langchain for orchestration, and OpenAI's API.

    Environment Setup

    First, ensure you have a running Neo4j instance (e.g., via Docker or AuraDB) and your environment variables are set.

    bash
    # .env file
    OPENAI_API_KEY="sk-..."
    NEO4J_URI="bolt://localhost:7687"
    NEO4J_USERNAME="neo4j"
    NEO4J_PASSWORD="your_password"

    And the required Python packages:

    bash
    pip install neo4j langchain langchain-openai python-dotenv

    Step 1: Neo4j Graph Connection and Schema

    We'll start with a graph connection manager. While Neo4j is schema-optional, defining constraints is crucial for performance and data integrity in production.

    python
    # graph_manager.py
    import os
    from neo4j import GraphDatabase
    from dotenv import load_dotenv
    
    load_dotenv()
    
    class Neo4jGraphManager:
        def __init__(self):
            uri = os.getenv("NEO4J_URI")
            user = os.getenv("NEO4J_USERNAME")
            password = os.getenv("NEO4J_PASSWORD")
            self._driver = GraphDatabase.driver(uri, auth=(user, password))
    
        def close(self):
            self._driver.close()
    
        def execute_query(self, query, parameters=None):
            with self._driver.session() as session:
                result = session.run(query, parameters)
                return [record for record in result]
    
        def setup_constraints(self):
            # Ensures entities and sessions are unique
            queries = [
                "CREATE CONSTRAINT IF NOT EXISTS FOR (s:Session) REQUIRE s.session_id IS UNIQUE",
                "CREATE CONSTRAINT IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE",
                "CREATE CONSTRAINT IF NOT EXISTS FOR (q:Query) REQUIRE q.id IS UNIQUE"
            ]
            for query in queries:
                self.execute_query(query)
            print("Graph constraints ensured.")
    
    # Initialize and setup
    graph_manager = Neo4jGraphManager()
    graph_manager.setup_constraints()

    Step 2: The Contextualizer Module - Extracting Entities and Finding Context

    This module is responsible for interpreting the user's query in the context of the conversation graph. Its primary job is to extract entities from the new query and then traverse the graph to find related information.

    We'll use an LLM for entity extraction, prompting it to return a structured JSON object.

    python
    # contextualizer.py
    import uuid
    from langchain_openai import ChatOpenAI
    from langchain_core.prompts import ChatPromptTemplate
    from langchain_core.pydantic_v1 import BaseModel, Field
    from typing import List
    
    class ExtractedEntities(BaseModel):
        """Model for entities extracted from a text."""
        entities: List[str] = Field(description="A list of named entities, like persons, organizations, or projects.")
    
    class Contextualizer:
        def __init__(self, graph_manager, session_id):
            self.graph_manager = graph_manager
            self.session_id = session_id
            self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
            self.structured_llm = self.llm.with_structured_output(ExtractedEntities)
            
        def _extract_entities(self, text: str) -> List[str]:
            prompt = ChatPromptTemplate.from_messages([
                ("system", "You are an expert entity extraction system. Your task is to identify and list all named entities from the given text. Focus on specific nouns like project names, people, and organizations. Do not extract generic terms."),
                ("human", "Extract all named entities from the following text: {text}")
            ])
            chain = prompt | self.structured_llm
            response = chain.invoke({"text": text})
            return response.entities
    
        def get_context(self, query_text: str) -> dict:
            entities = self._extract_entities(query_text)
            context = {"entities": entities, "graph_summary": ""}
    
            if not entities:
                # If no new entities, find context from the last turn in the session
                cypher_query = """
                MATCH (s:Session {session_id: $session_id})<-[:IN_SESSION]-(turn:Turn)
                WITH turn ORDER BY turn.timestamp DESC LIMIT 1
                MATCH (turn)-[:HAS_ENTITY]->(e:Entity)
                WITH e
                MATCH (e)-[r]-(related_entity)
                RETURN e.name AS entity, type(r) as relation, related_entity.name as related
                LIMIT 5
                """
                params = {"session_id": self.session_id}
            else:
                # If new entities are found, find context related to them
                cypher_query = """
                MATCH (e:Entity) WHERE e.name IN $entities
                MATCH (e)-[r]-(related_entity)
                RETURN e.name AS entity, type(r) as relation, related_entity.name as related
                LIMIT 10
                """
                params = {"entities": entities}
            
            results = self.graph_manager.execute_query(cypher_query, params)
            
            if results:
                summary = "\nConversation Context from Graph:\n"
                for record in results:
                    summary += f"- ({record['entity']})-[{record['relation']}]->({record['related']})\n"
                context["graph_summary"] = summary
                
            return context

    This class does two things:

  • _extract_entities: Uses a powerful LLM with structured output capabilities to reliably get a list of entities from the user's text.
  • get_context: Implements the core logic. If the user's query contains new entities, it searches the graph for information related to them. If the query is ambiguous (like "what about it?"), it intelligently defaults to retrieving context from the last conversational turn in the current session.
  • Step 3: Augmenting the Query

    Now, we use the retrieved context to rewrite the user's query into a clear, standalone question. This step is crucial for getting accurate results from the vector store.

    python
    # query_augmentor.py
    from langchain_openai import ChatOpenAI
    from langchain_core.prompts import ChatPromptTemplate
    
    class QueryAugmentor:
        def __init__(self):
            self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
    
        def augment(self, original_query: str, graph_context: str) -> str:
            if not graph_context:
                return original_query # No context, nothing to augment
    
            prompt = ChatPromptTemplate.from_messages([
                ("system", "You are an expert query rewriting assistant. Your task is to rewrite a potentially ambiguous user query into a clear, self-contained question based on the provided conversation context. If the query is already clear, return it as is."),
                ("human", "Rewrite the following query using the provided context.\n\n---\nContext:\n{context}\n---\nOriginal Query: {query}\n---\nRewritten Query:")
            ])
            
            chain = prompt | self.llm
            response = chain.invoke({"context": graph_context, "query": original_query})
            return response.content

    Step 4: The Stateful Loop - Updating the Graph

    After the LLM generates a response, we must persist the new information back into our graph memory. This GraphUpdater module will extract entities from both the final query and the LLM's answer and create the necessary nodes and relationships.

    python
    # graph_updater.py
    import uuid
    from datetime import datetime, timezone
    
    class GraphUpdater:
        def __init__(self, graph_manager, session_id, entity_extractor):
            self.graph_manager = graph_manager
            self.session_id = session_id
            self.entity_extractor = entity_extractor # Reuse the entity extraction logic
    
        def update_graph(self, query_text: str, augmented_query: str, response_text: str):
            turn_id = str(uuid.uuid4())
            timestamp = datetime.now(timezone.utc).isoformat()
    
            # Extract entities from the whole turn
            turn_text = f"User asked: {query_text}. System responded: {response_text}"
            entities = self.entity_extractor(turn_text)
            
            # Cypher query to create the full conversational turn
            cypher_query = """
            // Find or create the session
            MERGE (s:Session {session_id: $session_id})
            
            // Create the turn node
            CREATE (t:Turn {
                id: $turn_id,
                query: $query_text,
                augmented_query: $augmented_query,
                response: $response_text,
                timestamp: datetime($timestamp)
            })
            
            // Connect turn to the session
            CREATE (t)-[:IN_SESSION]->(s)
            
            // Connect turn to its extracted entities
            WITH t
            UNWIND $entities AS entity_name
            MERGE (e:Entity {name: entity_name})
            MERGE (t)-[:HAS_ENTITY]->(e)
            """
            
            params = {
                "session_id": self.session_id,
                "turn_id": turn_id,
                "query_text": query_text,
                "augmented_query": augmented_query,
                "response_text": response_text,
                "timestamp": timestamp,
                "entities": entities
            }
            
            self.graph_manager.execute_query(cypher_query, params)
            print(f"Graph updated for turn {turn_id} with entities: {entities}")

    This query is idempotent thanks to MERGE. It safely creates sessions, entities, and turns without duplication, building a robust log of the conversation.

    Step 5: Tying It All Together - The `StatefulRAGPipeline`

    Now, let's orchestrate these components. For brevity, we'll mock the vector store retrieval part and focus on the stateful logic.

    python
    # main.py
    import uuid
    
    # Assume previous classes are in these files
    from graph_manager import Neo4jGraphManager
    from contextualizer import Contextualizer
    from query_augmentor import QueryAugmentor
    from graph_updater import GraphUpdater
    
    # Mock Vector Store for demonstration
    class MockVectorStore:
        def search(self, query: str) -> str:
            print(f"\n--- MOCK VECTOR SEARCH --- \nQuery: '{query}'")
            if "Apollo" in query and "blockers" in query:
                return "Document Chunk: A key blocker for Project Apollo was a dependency on the delayed 'Zeus' component and unforeseen supply chain issues."
            return "Document Chunk: General project management documentation."
    
    class StatefulRAGPipeline:
        def __init__(self, session_id):
            self.session_id = session_id
            self.graph_manager = Neo4jGraphManager()
            self.contextualizer = Contextualizer(self.graph_manager, self.session_id)
            self.query_augmentor = QueryAugmentor()
            self.graph_updater = GraphUpdater(self.graph_manager, self.session_id, self.contextualizer._extract_entities)
            self.vector_store = MockVectorStore()
            self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.1)
    
        def run(self, query_text: str):
            print(f"\n[INPUT] User Query: '{query_text}'")
            
            # 1. Get context from graph
            context = self.contextualizer.get_context(query_text)
            print(f"[CONTEXT] Retrieved: {context}")
    
            # 2. Augment the query
            augmented_query = self.query_augmentor.augment(query_text, context['graph_summary'])
            print(f"[AUGMENT] Rewritten Query: '{augmented_query}'")
    
            # 3. Retrieve from vector store
            retrieved_docs = self.vector_store.search(augmented_query)
            print(f"[RETRIEVE] Found Docs: '{retrieved_docs}'")
    
            # 4. Synthesize response
            prompt = f"""
            You are a helpful AI assistant. Answer the user's query based on the provided context and documents.
            
            Context from Conversation History:
            {context['graph_summary']}
            
            Retrieved Documents:
            {retrieved_docs}
            
            User Query: {query_text}
            
            Answer:
            """
            response = self.llm.invoke(prompt)
            response_text = response.content
            print(f"[SYNTHESIZE] LLM Response: '{response_text}'")
    
            # 5. Update the graph
            self.graph_updater.update_graph(query_text, augmented_query, response_text)
            
            return response_text
    
    # --- Example Usage ---
    if __name__ == "__main__":
        session_id = f"session_{uuid.uuid4()}"
        pipeline = StatefulRAGPipeline(session_id)
        
        # Turn 1
        pipeline.run("Give me a summary of the Q3 2023 'Project Apollo' performance review.")
        
        # Turn 2
        pipeline.run("What were its primary blockers?")

    Running this pipeline demonstrates the power of our system:

    * Turn 1: The system processes the query, extracts "Project Apollo", and updates the graph, linking this entity to the first turn.

    * Turn 2: The user asks, "What were its primary blockers?".

    The Contextualizer extracts no new entities. It falls back to querying the graph for the context of the last turn*.

    * It finds the (e:Entity {name: 'Project Apollo'}) node connected to the previous turn.

    * The QueryAugmentor receives this context and rewrites the query to "What were the primary blockers for Project Apollo?".

    * This augmented query is sent to the vector store, which now performs a highly relevant search.

    * The final LLM response is accurate and grounded in both the retrieved documents and the conversational history.

    * The GraphUpdater adds the new turn and might even extract "Zeus component" as a new entity, linking it to "Project Apollo" with a HAS_BLOCKER relationship if we were to extend the entity extraction.

    Performance Considerations and Edge Cases

    A system this complex introduces new potential failure modes and performance bottlenecks. A senior engineer must anticipate and mitigate these.

    1. Latency Overhead

    Each turn now involves multiple LLM calls (extraction, augmentation, synthesis) and at least two database queries (context retrieval, update). This adds latency compared to a stateless RAG call.

    Mitigation Strategies:

    * Optimized LLMs: Use smaller, faster models like GPT-3.5-Turbo or fine-tuned open-source models for the extraction and augmentation tasks, which are less demanding than final synthesis.

    * Asynchronous Updates: The graph update step can be performed asynchronously. The user receives their response as soon as it's synthesized, and the graph update happens in the background. This doesn't reduce total work but significantly improves perceived latency.

    Cypher Query Optimization: Use EXPLAIN and PROFILE in Neo4j to analyze query plans. Ensure indexes and constraints are in place on key properties like session_id and entity.name. For large graphs, limiting traversal depth (MATCH (e)-[r1..2]-(related)) is critical.

    2. Edge Case: Entity Ambiguity

    What happens if the user mentions "John" and the graph contains both "John Doe" and "John Smith" from previous conversations?

    Solution: Disambiguation Logic

    Before augmenting the query, the Contextualizer can detect ambiguity. If a Cypher query for an entity name returns multiple distinct nodes, the system can initiate a disambiguation step.

    python
    # In Contextualizer.get_context
    cypher_query = "MATCH (e:Entity) WHERE e.name CONTAINS $entity_name RETURN e.name AS name"
    results = self.graph_manager.execute_query(cypher_query, {"entity_name": "John"})
    if len(results) > 1:
        # Ambiguity detected
        options = [r['name'] for r in results]
        # Trigger a clarifying question to the user
        return {"status": "AMBIGUOUS", "options": options}

    The pipeline would then need to handle this state, pause, and ask the user, "Which 'John' are you referring to: John Doe or John Smith?" before proceeding.

    3. Edge Case: Context Window Overload

    As the graph grows, a simple query could return a massive amount of context, potentially exceeding the LLM's context window or introducing noise.

    Solution: Context Summarization and Pruning

    * Limit Traversal: Use strict limits in Cypher queries (LIMIT 5).

    * Ranked Retrieval: Prioritize context from the most recent turns within the session.

    * LLM-based Summarization: If the retrieved graph context (the text summary) is too long, pass it through another quick LLM call to summarize it before injecting it into the final prompt. For example: "Summarize this context in 3 bullet points: ...".

    * Graph Pruning: For long-lived sessions, implement a strategy to prune or archive very old Turn nodes to keep the active graph for a session manageable.

    Conclusion: From Stateless Tools to Stateful Partners

    By replacing a volatile chat history with a persistent, structured graph memory, we elevate our RAG systems from simple Q&A bots to genuine conversational partners. This architecture provides a robust foundation for handling complex, multi-turn dialogues, resolving ambiguity, and building a cumulative understanding of the user's intent over time.

    The initial implementation is more complex than a stateless RAG pipeline, but the payoff is an AI that can remember, reason, and interact in a way that feels significantly more intelligent and useful. This pattern is not just a theoretical exercise; it's a necessary evolutionary step for building the next generation of sophisticated AI applications.

    Found this article helpful?

    Share it with others who might benefit from it.

    More Articles