LangGraph Agents · Lesson 13 of 17
Time Travel: Rewinding Graph State
What is Time Travel?
Time travel lets you rewind to any past state checkpoint and continue from there — either replaying the same execution or branching into a different continuation.
Use cases:
- Debugging: Step through execution to find where the agent went wrong
- Exploration: Try different inputs at step N without re-running steps 1 to N-1
- Testing: Verify that specific intermediate states are correct
- Recovery: Rewind past a failed step and retry with different parameters
Prerequisites
Time travel requires checkpointing. Without a checkpointer, there's no history to travel through.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class ResearchState(TypedDict):
query: str
steps: Annotated[list[str], operator.add]
result: str
step_count: int
def step1(state: ResearchState) -> dict:
return {"steps": ["Step 1: Retrieved initial data"], "step_count": 1}
def step2(state: ResearchState) -> dict:
return {"steps": ["Step 2: Analyzed data"], "step_count": state["step_count"] + 1}
def step3(state: ResearchState) -> dict:
return {"steps": ["Step 3: Generated report"], "result": "Final report here", "step_count": state["step_count"] + 1}
graph = StateGraph(ResearchState)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_node("step3", step3)
graph.set_entry_point("step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)Running and Examining History
config = {"configurable": {"thread_id": "research_session_1"}}
# Run the full graph
result = app.invoke({
"query": "warfarin interactions",
"steps": [],
"result": "",
"step_count": 0,
}, config=config)
print(f"Steps: {result['steps']}")
print(f"Result: {result['result']}")
# List all checkpoints (most recent first)
history = list(app.get_state_history(config))
print(f"\nTotal checkpoints: {len(history)}")
for i, (snapshot, meta) in enumerate(history):
step_num = snapshot.values.get("step_count", 0)
steps = snapshot.values.get("steps", [])
print(f" Checkpoint {i}: step_count={step_num}, steps_recorded={len(steps)}, next={snapshot.next}")Rewinding to a Past Checkpoint
Each checkpoint has a unique ID. Use it to restore that state:
# Get history
history = list(app.get_state_history(config))
# Find the checkpoint after step1 (before step2 ran)
# Checkpoints are most-recent-first, so the oldest is last
for snapshot, meta in reversed(history):
if snapshot.values.get("step_count") == 1:
step1_checkpoint = snapshot
break
print(f"Rewinding to: step_count={step1_checkpoint.values['step_count']}")
print(f"Checkpoint ID: {step1_checkpoint.config['configurable']['checkpoint_id']}")
# Rewind by providing the checkpoint config
rewind_config = step1_checkpoint.config
# Now the state is back at after-step1
current = app.get_state(rewind_config)
print(f"State after rewind: {current.values['steps']}")Branching: Continue from a Past State
After rewinding, you can continue execution to create a branch:
# Get the checkpoint at step 1
history = list(app.get_state_history(config))
step1_config = None
for snapshot, meta in reversed(history):
if snapshot.values.get("step_count") == 1:
step1_config = snapshot.config
break
# Update state at that checkpoint point (simulate different input)
app.update_state(
step1_config,
{"query": "aspirin mechanism of action"}, # Different query
)
# Resume from this modified checkpoint — creates a new branch
new_result = app.invoke(None, config=step1_config)
print(f"Branch result: {new_result['result']}")
print(f"Branch steps: {new_result['steps']}")The original thread is unchanged — branching creates a diverging history from the checkpoint point.
Practical: Debugging a Failed Agent
When an agent produces an unexpected output, time travel helps you find where it went wrong:
class DebugHelper:
def __init__(self, app, config):
self.app = app
self.config = config
def print_execution_trace(self):
"""Print full execution history step by step."""
history = list(self.app.get_state_history(self.config))
print("=== Execution Trace (oldest to newest) ===")
for snapshot, meta in reversed(history):
print(f"\n[Step {snapshot.values.get('step_count', '?')}]")
print(f" Next node: {snapshot.next}")
for key, value in snapshot.values.items():
if isinstance(value, list):
print(f" {key}: {value[-1] if value else '[]'} (total: {len(value)})")
else:
print(f" {key}: {repr(value)[:60]}")
def replay_from_step(self, target_step_count: int) -> dict:
"""Replay execution from a specific step."""
history = list(self.app.get_state_history(self.config))
target_config = None
for snapshot, meta in reversed(history):
if snapshot.values.get("step_count") == target_step_count:
target_config = snapshot.config
break
if not target_config:
raise ValueError(f"No checkpoint found at step {target_step_count}")
return self.app.invoke(None, config=target_config)
# Usage
config = {"configurable": {"thread_id": "debug_session"}}
app.invoke(initial_state, config=config)
debugger = DebugHelper(app, config)
debugger.print_execution_trace()
# Re-run from step 1 to get a fresh branch
replay_result = debugger.replay_from_step(1)Checkpoint IDs and Thread Isolation
Each checkpoint has a unique ID within its thread. Threads are isolated:
# Thread 1
config_1 = {"configurable": {"thread_id": "thread_1"}}
# Thread 2
config_2 = {"configurable": {"thread_id": "thread_2"}}
# These are completely separate histories
result_1 = app.invoke(state_1, config=config_1)
result_2 = app.invoke(state_2, config=config_2)
# Time travel in thread_1 doesn't affect thread_2Use thread IDs to isolate users (each user has their own thread), test runs (each test run has its own thread), or workflow instances (each order/case has its own thread).