Learnixo

LangGraph Agents · Lesson 12 of 17

Checkpointing: Persisting Agent State

What Checkpointing Does

Without checkpointing, each app.invoke() call starts from scratch — no memory of previous runs.

With checkpointing, LangGraph saves the complete state after every node execution. You can:

  • Resume a multi-turn conversation from where it left off
  • Recover from a crash without restarting from the beginning
  • Inspect exactly what happened at each step
  • Branch from any past state to explore alternative continuations

MemorySaver (Development)

In-memory checkpointing — data lives only for the duration of the process:

Python
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
import operator

class ChatState(TypedDict):
    messages: Annotated[list[dict], operator.add]
    session_topic: str

def respond(state: ChatState) -> dict:
    from openai import OpenAI
    client = OpenAI()

    # Format messages for OpenAI
    openai_messages = [
        {"role": m["role"], "content": m["content"]}
        for m in state["messages"]
    ]
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=openai_messages,
    )
    return {"messages": [{"role": "assistant", "content": resp.choices[0].message.content}]}

graph = StateGraph(ChatState)
graph.add_node("respond", respond)
graph.set_entry_point("respond")
graph.add_edge("respond", END)

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

# Thread ID identifies the conversation
config = {"configurable": {"thread_id": "user_123_session_1"}}

# Turn 1
result1 = app.invoke({
    "messages": [{"role": "user", "content": "What is warfarin?"}],
    "session_topic": "anticoagulants",
}, config=config)

# Turn 2  state accumulated from turn 1
result2 = app.invoke({
    "messages": [{"role": "user", "content": "What are its main interactions?"}],
}, config=config)

print(f"Message count after 2 turns: {len(result2['messages'])}")
# All 4 messages (user1, assistant1, user2, assistant2) are accumulated

SqliteSaver (Local Persistence)

Persists to a SQLite file — survives process restarts:

Python
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

# Persistent across process restarts
conn = sqlite3.connect("agent_checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)

app = graph.compile(checkpointer=checkpointer)

# Same thread_id resumes the conversation from the last checkpoint
config = {"configurable": {"thread_id": "session_abc"}}
result = app.invoke({"messages": [{"role": "user", "content": "Hello"}]}, config=config)

Use SqliteSaver for:

  • Development and testing with persistence
  • Single-instance deployments
  • Long-running CLI agents

PostgresSaver (Production)

For multi-instance deployments, use PostgreSQL:

Python
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg2

conn_string = "postgresql://user:password@localhost:5432/langgraph_checkpoints"
checkpointer = PostgresSaver.from_conn_string(conn_string)

# Create tables if they don't exist
checkpointer.setup()

app = graph.compile(checkpointer=checkpointer)

PostgresSaver is the production choice:

  • Works across multiple API server instances
  • Durable — survives server restarts and crashes
  • Supports concurrent access
  • Scales with your database

Thread Management

Each conversation/session has a unique thread_id. Multiple users = multiple threads:

Python
def create_session(user_id: str) -> str:
    """Generate a unique thread ID for a user session."""
    import uuid
    session_id = str(uuid.uuid4())
    return f"{user_id}:{session_id}"

def handle_message(user_id: str, session_id: str, message: str) -> str:
    config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}

    result = app.invoke(
        {"messages": [{"role": "user", "content": message}]},
        config=config,
    )
    return result["messages"][-1]["content"]

# Get conversation history
def get_history(user_id: str, session_id: str) -> list[dict]:
    config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}
    state = app.get_state(config)
    return state.values.get("messages", [])

Inspecting State History

Walk through all checkpointed states for a thread:

Python
config = {"configurable": {"thread_id": "session_abc"}}

# Current state
current = app.get_state(config)
print(f"Current step: {current.next}")
print(f"Current messages: {len(current.values.get('messages', []))}")

# Full history (most recent first)
history = list(app.get_state_history(config))
print(f"Total checkpoints: {len(history)}")

for i, (state_snapshot, metadata) in enumerate(history[:5]):
    print(f"\nCheckpoint {i}:")
    print(f"  Node: {metadata.get('source', 'unknown')}")
    print(f"  Step: {metadata.get('step', 'unknown')}")
    print(f"  Messages: {len(state_snapshot.values.get('messages', []))}")

Updating State Between Runs

Modify state before resuming:

Python
config = {"configurable": {"thread_id": "session_abc"}}

# Get current state
state = app.get_state(config)
current_values = state.values

# Update specific fields
app.update_state(
    config,
    {"confidence_score": 0.9, "needs_human_review": False},
    as_node="__start__",  # Inject as if from the start node
)

# Resume from updated state
result = app.invoke(None, config=config)

This is useful for human-in-the-loop flows where a human's decision needs to be recorded in state before the graph continues.


Async Checkpointing

For async graph execution:

Python
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

async def run_async():
    config = {"configurable": {"thread_id": "async_session_1"}}
    result = await app.ainvoke(
        {"messages": [{"role": "user", "content": "Start"}]},
        config=config,
    )
    return result

import asyncio
asyncio.run(run_async())

All checkpointer types support async operations via aget_state, aget_state_history, aput internally.