Checkpointing: Persistent State in LangGraph
Use LangGraph checkpointers to persist agent state across runs. Compare MemorySaver, SqliteSaver, and PostgresSaver. Enable multi-session conversations and crash recovery.
What Checkpointing Does
Without checkpointing, each app.invoke() call starts from scratch — no memory of previous runs.
With checkpointing, LangGraph saves the complete state after every node execution. You can:
- Resume a multi-turn conversation from where it left off
- Recover from a crash without restarting from the beginning
- Inspect exactly what happened at each step
- Branch from any past state to explore alternative continuations
MemorySaver (Development)
In-memory checkpointing — data lives only for the duration of the process:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
import operator
class ChatState(TypedDict):
messages: Annotated[list[dict], operator.add]
session_topic: str
def respond(state: ChatState) -> dict:
from openai import OpenAI
client = OpenAI()
# Format messages for OpenAI
openai_messages = [
{"role": m["role"], "content": m["content"]}
for m in state["messages"]
]
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=openai_messages,
)
return {"messages": [{"role": "assistant", "content": resp.choices[0].message.content}]}
graph = StateGraph(ChatState)
graph.add_node("respond", respond)
graph.set_entry_point("respond")
graph.add_edge("respond", END)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
# Thread ID identifies the conversation
config = {"configurable": {"thread_id": "user_123_session_1"}}
# Turn 1
result1 = app.invoke({
"messages": [{"role": "user", "content": "What is warfarin?"}],
"session_topic": "anticoagulants",
}, config=config)
# Turn 2 — state accumulated from turn 1
result2 = app.invoke({
"messages": [{"role": "user", "content": "What are its main interactions?"}],
}, config=config)
print(f"Message count after 2 turns: {len(result2['messages'])}")
# All 4 messages (user1, assistant1, user2, assistant2) are accumulatedSqliteSaver (Local Persistence)
Persists to a SQLite file — survives process restarts:
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
# Persistent across process restarts
conn = sqlite3.connect("agent_checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
app = graph.compile(checkpointer=checkpointer)
# Same thread_id resumes the conversation from the last checkpoint
config = {"configurable": {"thread_id": "session_abc"}}
result = app.invoke({"messages": [{"role": "user", "content": "Hello"}]}, config=config)Use SqliteSaver for:
- Development and testing with persistence
- Single-instance deployments
- Long-running CLI agents
PostgresSaver (Production)
For multi-instance deployments, use PostgreSQL:
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg2
conn_string = "postgresql://user:password@localhost:5432/langgraph_checkpoints"
checkpointer = PostgresSaver.from_conn_string(conn_string)
# Create tables if they don't exist
checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)PostgresSaver is the production choice:
- Works across multiple API server instances
- Durable — survives server restarts and crashes
- Supports concurrent access
- Scales with your database
Thread Management
Each conversation/session has a unique thread_id. Multiple users = multiple threads:
def create_session(user_id: str) -> str:
"""Generate a unique thread ID for a user session."""
import uuid
session_id = str(uuid.uuid4())
return f"{user_id}:{session_id}"
def handle_message(user_id: str, session_id: str, message: str) -> str:
config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}
result = app.invoke(
{"messages": [{"role": "user", "content": message}]},
config=config,
)
return result["messages"][-1]["content"]
# Get conversation history
def get_history(user_id: str, session_id: str) -> list[dict]:
config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}
state = app.get_state(config)
return state.values.get("messages", [])Inspecting State History
Walk through all checkpointed states for a thread:
config = {"configurable": {"thread_id": "session_abc"}}
# Current state
current = app.get_state(config)
print(f"Current step: {current.next}")
print(f"Current messages: {len(current.values.get('messages', []))}")
# Full history (most recent first)
history = list(app.get_state_history(config))
print(f"Total checkpoints: {len(history)}")
for i, (state_snapshot, metadata) in enumerate(history[:5]):
print(f"\nCheckpoint {i}:")
print(f" Node: {metadata.get('source', 'unknown')}")
print(f" Step: {metadata.get('step', 'unknown')}")
print(f" Messages: {len(state_snapshot.values.get('messages', []))}")Updating State Between Runs
Modify state before resuming:
config = {"configurable": {"thread_id": "session_abc"}}
# Get current state
state = app.get_state(config)
current_values = state.values
# Update specific fields
app.update_state(
config,
{"confidence_score": 0.9, "needs_human_review": False},
as_node="__start__", # Inject as if from the start node
)
# Resume from updated state
result = app.invoke(None, config=config)This is useful for human-in-the-loop flows where a human's decision needs to be recorded in state before the graph continues.
Async Checkpointing
For async graph execution:
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
async def run_async():
config = {"configurable": {"thread_id": "async_session_1"}}
result = await app.ainvoke(
{"messages": [{"role": "user", "content": "Start"}]},
config=config,
)
return result
import asyncio
asyncio.run(run_async())All checkpointer types support async operations via aget_state, aget_state_history, aput internally.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.