Learnixo

GenAI & LLM Interviews · Lesson 18 of 30

Interview: LangGraph Architecture

Q1: Which checkpointer should you use in production and why?

Answer: PostgresSaver for most production workloads. MemorySaver loses all state when the process restarts — unsuitable for anything requiring durability. SqliteSaver works for single-instance deployments but doesn't support concurrent writes from multiple API server instances. PostgresSaver is the right choice because:

  • Durable across server restarts and crashes
  • Supports concurrent access from multiple instances (horizontal scaling)
  • Works with connection pooling (PgBouncer, asyncpg)
  • Enables cross-instance resumption of interrupted threads

Call checkpointer.setup() once on startup to create the required tables. Use a dedicated schema or database for checkpoints to keep it separate from application data.


Q2: A node in your LangGraph agent raises an exception mid-execution. What happens to state?

Answer: With checkpointing enabled, the last successfully completed checkpoint is preserved. LangGraph saves state after each node completes — if node 3 raises an exception, the checkpoint after node 2 still exists.

Recovery options:

  1. Retry from last checkpoint: Fix the error, then call app.invoke(None, config=config) — the graph resumes from after node 2
  2. Modify state before retry: Use app.update_state(config, {...}) to correct the input that caused the failure, then resume
  3. Manual override: If node 3 should be skipped entirely, use app.update_state(config, {...}, as_node="node3") to inject its expected output directly, then resume

Without checkpointing, an exception loses all work done in that run — you start over from scratch.


Q3: How do you prevent a supervisor agent from entering an infinite routing loop?

Answer: Two layers of protection:

Layer 1 — Step counter in state:

Python
class SupervisorState(TypedDict):
    routing_steps: int
    max_routing_steps: int

def supervisor(state):
    if state["routing_steps"] >= state["max_routing_steps"]:
        return {"next_agent": "FINISH", "routing_steps": state["routing_steps"] + 1}
    # normal routing logic...
    return {**result, "routing_steps": state["routing_steps"] + 1}

Layer 2 — LangGraph's built-in recursion limit:

Python
app = graph.compile(checkpointer=checkpointer)
app.invoke(initial_state, config=config, recursion_limit=50)

Set max_routing_steps to 2–3× the number of available agents. Set recursion_limit as a hard ceiling. Both are needed: the step counter gives a clean "FINISH" path, the recursion limit is a safety net against bugs in the step counter itself.


Q4: What is the difference between interrupt_before and interrupt_after, and when do you choose each?

Answer:

| | interrupt_before | interrupt_after | |---|---|---| | Timing | Pauses before the node runs | Pauses after the node runs | | Node's output in state? | No — node hasn't executed yet | Yes — node has completed | | Use case | Human approves before expensive/irreversible action | Human reviews what the node produced |

Use interrupt_before when:

  • The action is irreversible (sending an email, submitting an order) and needs approval before it happens
  • You want to let the human decide whether to proceed at all

Use interrupt_after when:

  • You want the human to review the agent's output before the next step consumes it
  • The node's result needs human validation (e.g., AI analysis before pharmacist sign-off)

Q5: How do you design state for a long-running workflow that processes hundreds of steps?

Answer: Three principles:

Keep accumulated lists bounded: Instead of Annotated[list[str], operator.add] (unbounded), use a custom reducer that caps at N items:

Python
def last_100(existing, update):
    return (existing + update)[-100:]

recent_events: Annotated[list[str], last_100]

Separate working state from output state: Working fields are written and overwritten frequently; output fields are the final deliverables. Don't accumulate working data into output lists.

Use external storage for large payloads: Don't store large documents or binary data directly in graph state. Store them in S3 or a database and put only the reference (a key or URL) in state. State should contain metadata, not content.

Type your state completely: Every field should have a type annotation. Untyped fields become sources of bugs when nodes accidentally overwrite each other.


Q6: How do you test a LangGraph agent without calling real LLMs?

Answer: Three levels of testing:

Node-level unit tests: Nodes are just functions. Test them directly with a state dict — no graph needed:

Python
def test_safety_check():
    state = {"drug_name": "warfarin", "risk_flags": []}
    result = safety_check(state)
    assert "INR monitoring" in result["risk_flags"][0]

Subgraph integration tests: Compile and invoke subgraphs with MemorySaver. Mock the LLM by patching:

Python
with patch("mymodule.llm.invoke", return_value=mock_response):
    result = research_subgraph.invoke(initial_state)

Full graph regression tests: Maintain a golden dataset of (input, expected_output) pairs. Run the full graph against each pair in CI. LLM calls can be cached with langchain.cache to make tests deterministic and free.

Test interrupt_before paths by checking snapshot.next equals the expected interrupt node, then resuming with controlled state.


Q7: Supervisor pattern vs sequential graph — when is each appropriate?

Answer:

Use the supervisor pattern when:

  • Not all specialists are needed for every query (avoids unnecessary LLM calls)
  • The supervisor needs to judge which specialist to call based on prior results
  • The order of specialist invocation varies by query

Use a sequential graph when:

  • Every step runs for every input (fixed pipeline)
  • Step order never changes
  • Cost matters — supervisor adds one LLM call per routing decision
  • Simpler debugging is a priority

Cost comparison for a 4-specialist workflow:

  • Sequential: 4–5 LLM calls (one per step)
  • Supervisor: 5–10 LLM calls (supervisor + each specialist + routing decisions between them)

A supervisor that always routes to the same 4 specialists in the same order is just an expensive sequential graph. Use supervisor only when routing genuinely varies.


Q8: How do you handle parallel execution in LangGraph, and what are the pitfalls?

Answer: Use Send to fan out parallel tasks:

Python
from langgraph.types import Send

def fan_out(state):
    return [Send("process_drug", {"drug_name": drug}) for drug in state["drug_list"]]

graph.add_conditional_edges("__start__", fan_out)

All Send tasks run in parallel. Results merge back into the parent state via reducers.

Pitfalls:

  1. Race conditions on replaced fields: If multiple parallel nodes write to the same non-reducer field, the last write wins — unpredictably. Only parallel branches should write to reducer-annotated fields (lists with operator.add).

  2. Error handling: If one parallel branch fails, the others continue. The exception surfaces when LangGraph tries to merge results. Wrap branch nodes in try/except and return an error indicator field.

  3. Rate limits: Parallel branches all start at the same time. 10 parallel LLM calls will hit rate limits faster than 10 sequential calls. Add backoff or limit parallelism with a semaphore if rate limits are a concern.


Q9: How do you control costs in a production LangGraph agent?

Answer: Five levers:

1. Choose models by node: Not every node needs GPT-4o. Use a cheaper model (GPT-4o-mini) for extraction and classification; reserve GPT-4o for synthesis and complex reasoning.

2. Cache at the node level: If a node's output depends only on deterministic inputs, cache it. LangChain's set_llm_cache with SQLite caches identical prompts.

3. Prefer sequential over supervisor when routing is predictable: Each supervisor routing decision is an LLM call. Eliminating it for fixed-pipeline workflows cuts costs significantly.

4. Token-budget the state: Large accumulated lists get passed to every subsequent LLM call. Use last_N reducers to cap history. Summarize rather than accumulate when the list grows beyond a budget.

5. Track token usage per thread: Record result.usage_metadata (if using LangChain LLM wrappers) to state, aggregate per thread, and alert when a single thread exceeds a budget. Kill long-running loops early rather than letting them run to recursion_limit.


Q10: System design — design a drug safety review system using LangGraph

Scenario: Pharmacists at a hospital need an AI system that (1) analyzes new drug orders for safety issues, (2) routes high-risk orders for human review, and (3) auto-approves low-risk orders. It must be auditable, resumable across restarts, and support 50 concurrent pharmacists.

Answer:

State design:

Python
class DrugOrderState(TypedDict):
    order_id: str
    patient_id: str
    drug_name: str
    dose_mg: float
    risk_score: float          # 0.0–1.0 computed by AI
    risk_flags: Annotated[list[str], operator.add]
    decision: str              # "auto_approved", "human_approved", "rejected"
    pharmacist_id: str
    pharmacist_notes: str
    audit_log: Annotated[list[str], operator.add]
    final_order: str

Graph structure:

analyze_order
    ↓
route_by_risk
    ├── risk below 0.3 → auto_approve → END
    └── risk 0.3+ → [INTERRUPT] → pharmacist_review → apply_decision → END

Key decisions:

  • Checkpointer: PostgresSaver — must survive server restarts. 50 concurrent pharmacists means multiple API server instances.
  • Thread IDs: f"order_{order_id}" — unique per drug order. An order suspended for pharmacist review can be resumed hours later on a different server instance.
  • Interrupt point: interrupt_after=["analyze_order"] — AI analysis completes first (so results are in state for the pharmacist to review), then pauses for human decision.
  • Audit log: Annotated[list[str], operator.add] field records every state transition — required for regulatory compliance.
  • Auto-approval path: Low-risk orders never hit the interrupt. They complete in a single invoke() call.

API surface:

  • POST /orders — starts graph, returns thread_id + analysis if interrupted, or final result if auto-approved
  • GET /orders/{thread_id} — returns current state for pharmacist UI
  • POST /orders/{thread_id}/review — calls update_state + invoke(None) to resume

Scaling: PostgresSaver handles concurrent access. The API is stateless — any instance can resume any thread. Background job expires stale orders (no review within 4 hours → escalate to senior pharmacist).