LangGraph Agents · Lesson 17 of 17
Interview: LangGraph Architecture Scenarios
Q1: Which checkpointer should you use in production and why?
Answer: PostgresSaver for most production workloads. MemorySaver loses all state when the process restarts — unsuitable for anything requiring durability. SqliteSaver works for single-instance deployments but doesn't support concurrent writes from multiple API server instances. PostgresSaver is the right choice because:
- Durable across server restarts and crashes
- Supports concurrent access from multiple instances (horizontal scaling)
- Works with connection pooling (PgBouncer, asyncpg)
- Enables cross-instance resumption of interrupted threads
Call checkpointer.setup() once on startup to create the required tables. Use a dedicated schema or database for checkpoints to keep it separate from application data.
Q2: A node in your LangGraph agent raises an exception mid-execution. What happens to state?
Answer: With checkpointing enabled, the last successfully completed checkpoint is preserved. LangGraph saves state after each node completes — if node 3 raises an exception, the checkpoint after node 2 still exists.
Recovery options:
- Retry from last checkpoint: Fix the error, then call
app.invoke(None, config=config)— the graph resumes from after node 2 - Modify state before retry: Use
app.update_state(config, {...})to correct the input that caused the failure, then resume - Manual override: If node 3 should be skipped entirely, use
app.update_state(config, {...}, as_node="node3")to inject its expected output directly, then resume
Without checkpointing, an exception loses all work done in that run — you start over from scratch.
Q3: How do you prevent a supervisor agent from entering an infinite routing loop?
Answer: Two layers of protection:
Layer 1 — Step counter in state:
class SupervisorState(TypedDict):
routing_steps: int
max_routing_steps: int
def supervisor(state):
if state["routing_steps"] >= state["max_routing_steps"]:
return {"next_agent": "FINISH", "routing_steps": state["routing_steps"] + 1}
# normal routing logic...
return {**result, "routing_steps": state["routing_steps"] + 1}Layer 2 — LangGraph's built-in recursion limit:
app = graph.compile(checkpointer=checkpointer)
app.invoke(initial_state, config=config, recursion_limit=50)Set max_routing_steps to 2–3× the number of available agents. Set recursion_limit as a hard ceiling. Both are needed: the step counter gives a clean "FINISH" path, the recursion limit is a safety net against bugs in the step counter itself.
Q4: What is the difference between interrupt_before and interrupt_after, and when do you choose each?
Answer:
| | interrupt_before | interrupt_after |
|---|---|---|
| Timing | Pauses before the node runs | Pauses after the node runs |
| Node's output in state? | No — node hasn't executed yet | Yes — node has completed |
| Use case | Human approves before expensive/irreversible action | Human reviews what the node produced |
Use interrupt_before when:
- The action is irreversible (sending an email, submitting an order) and needs approval before it happens
- You want to let the human decide whether to proceed at all
Use interrupt_after when:
- You want the human to review the agent's output before the next step consumes it
- The node's result needs human validation (e.g., AI analysis before pharmacist sign-off)
Q5: How do you design state for a long-running workflow that processes hundreds of steps?
Answer: Three principles:
Keep accumulated lists bounded: Instead of Annotated[list[str], operator.add] (unbounded), use a custom reducer that caps at N items:
def last_100(existing, update):
return (existing + update)[-100:]
recent_events: Annotated[list[str], last_100]Separate working state from output state: Working fields are written and overwritten frequently; output fields are the final deliverables. Don't accumulate working data into output lists.
Use external storage for large payloads: Don't store large documents or binary data directly in graph state. Store them in S3 or a database and put only the reference (a key or URL) in state. State should contain metadata, not content.
Type your state completely: Every field should have a type annotation. Untyped fields become sources of bugs when nodes accidentally overwrite each other.
Q6: How do you test a LangGraph agent without calling real LLMs?
Answer: Three levels of testing:
Node-level unit tests: Nodes are just functions. Test them directly with a state dict — no graph needed:
def test_safety_check():
state = {"drug_name": "warfarin", "risk_flags": []}
result = safety_check(state)
assert "INR monitoring" in result["risk_flags"][0]Subgraph integration tests: Compile and invoke subgraphs with MemorySaver. Mock the LLM by patching:
with patch("mymodule.llm.invoke", return_value=mock_response):
result = research_subgraph.invoke(initial_state)Full graph regression tests: Maintain a golden dataset of (input, expected_output) pairs. Run the full graph against each pair in CI. LLM calls can be cached with langchain.cache to make tests deterministic and free.
Test interrupt_before paths by checking snapshot.next equals the expected interrupt node, then resuming with controlled state.
Q7: Supervisor pattern vs sequential graph — when is each appropriate?
Answer:
Use the supervisor pattern when:
- Not all specialists are needed for every query (avoids unnecessary LLM calls)
- The supervisor needs to judge which specialist to call based on prior results
- The order of specialist invocation varies by query
Use a sequential graph when:
- Every step runs for every input (fixed pipeline)
- Step order never changes
- Cost matters — supervisor adds one LLM call per routing decision
- Simpler debugging is a priority
Cost comparison for a 4-specialist workflow:
- Sequential: 4–5 LLM calls (one per step)
- Supervisor: 5–10 LLM calls (supervisor + each specialist + routing decisions between them)
A supervisor that always routes to the same 4 specialists in the same order is just an expensive sequential graph. Use supervisor only when routing genuinely varies.
Q8: How do you handle parallel execution in LangGraph, and what are the pitfalls?
Answer: Use Send to fan out parallel tasks:
from langgraph.types import Send
def fan_out(state):
return [Send("process_drug", {"drug_name": drug}) for drug in state["drug_list"]]
graph.add_conditional_edges("__start__", fan_out)All Send tasks run in parallel. Results merge back into the parent state via reducers.
Pitfalls:
-
Race conditions on replaced fields: If multiple parallel nodes write to the same non-reducer field, the last write wins — unpredictably. Only parallel branches should write to reducer-annotated fields (lists with
operator.add). -
Error handling: If one parallel branch fails, the others continue. The exception surfaces when LangGraph tries to merge results. Wrap branch nodes in try/except and return an error indicator field.
-
Rate limits: Parallel branches all start at the same time. 10 parallel LLM calls will hit rate limits faster than 10 sequential calls. Add backoff or limit parallelism with a semaphore if rate limits are a concern.
Q9: How do you control costs in a production LangGraph agent?
Answer: Five levers:
1. Choose models by node: Not every node needs GPT-4o. Use a cheaper model (GPT-4o-mini) for extraction and classification; reserve GPT-4o for synthesis and complex reasoning.
2. Cache at the node level: If a node's output depends only on deterministic inputs, cache it. LangChain's set_llm_cache with SQLite caches identical prompts.
3. Prefer sequential over supervisor when routing is predictable: Each supervisor routing decision is an LLM call. Eliminating it for fixed-pipeline workflows cuts costs significantly.
4. Token-budget the state: Large accumulated lists get passed to every subsequent LLM call. Use last_N reducers to cap history. Summarize rather than accumulate when the list grows beyond a budget.
5. Track token usage per thread: Record result.usage_metadata (if using LangChain LLM wrappers) to state, aggregate per thread, and alert when a single thread exceeds a budget. Kill long-running loops early rather than letting them run to recursion_limit.
Q10: System design — design a drug safety review system using LangGraph
Scenario: Pharmacists at a hospital need an AI system that (1) analyzes new drug orders for safety issues, (2) routes high-risk orders for human review, and (3) auto-approves low-risk orders. It must be auditable, resumable across restarts, and support 50 concurrent pharmacists.
Answer:
State design:
class DrugOrderState(TypedDict):
order_id: str
patient_id: str
drug_name: str
dose_mg: float
risk_score: float # 0.0–1.0 computed by AI
risk_flags: Annotated[list[str], operator.add]
decision: str # "auto_approved", "human_approved", "rejected"
pharmacist_id: str
pharmacist_notes: str
audit_log: Annotated[list[str], operator.add]
final_order: strGraph structure:
analyze_order
↓
route_by_risk
├── risk below 0.3 → auto_approve → END
└── risk 0.3+ → [INTERRUPT] → pharmacist_review → apply_decision → ENDKey decisions:
- Checkpointer:
PostgresSaver— must survive server restarts. 50 concurrent pharmacists means multiple API server instances. - Thread IDs:
f"order_{order_id}"— unique per drug order. An order suspended for pharmacist review can be resumed hours later on a different server instance. - Interrupt point:
interrupt_after=["analyze_order"]— AI analysis completes first (so results are in state for the pharmacist to review), then pauses for human decision. - Audit log:
Annotated[list[str], operator.add]field records every state transition — required for regulatory compliance. - Auto-approval path: Low-risk orders never hit the interrupt. They complete in a single
invoke()call.
API surface:
POST /orders— starts graph, returns thread_id + analysis if interrupted, or final result if auto-approvedGET /orders/{thread_id}— returns current state for pharmacist UIPOST /orders/{thread_id}/review— callsupdate_state+invoke(None)to resume
Scaling: PostgresSaver handles concurrent access. The API is stateless — any instance can resume any thread. Background job expires stale orders (no review within 4 hours → escalate to senior pharmacist).