LangGraph Agents · Lesson 16 of 17
Human-in-the-Loop: Approval and Correction Steps
Why Human-in-the-Loop?
Some agent decisions are too consequential to run fully automatically. Human-in-the-loop (HITL) lets you pause graph execution at specific nodes so a human can review state, correct values, or approve the next action before the graph continues.
Common patterns:
- Approval gates: Agent drafts a response, human approves before sending
- Error correction: Agent flags low confidence, human provides the correct value
- Audit trails: Regulatory requirement to log human sign-off at each step
- Override points: Human can redirect the agent to a different branch
Prerequisites: Checkpointing
HITL requires a checkpointer. Without one, there is no state to resume from after the human acts.
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
import operator
class ReviewState(TypedDict):
drug_name: str
draft_report: str
human_feedback: str
final_report: str
approved: bool
steps: Annotated[list[str], operator.add]
def research(state: ReviewState) -> dict:
report = f"Draft report for {state['drug_name']}: mechanism, interactions, dosing."
return {
"draft_report": report,
"steps": ["Research completed"],
}
def finalize(state: ReviewState) -> dict:
feedback = state.get("human_feedback", "")
if feedback:
final = f"{state['draft_report']}\n\nRevisions applied: {feedback}"
else:
final = state["draft_report"]
return {
"final_report": final,
"steps": ["Report finalized"],
}
graph = StateGraph(ReviewState)
graph.add_node("research", research)
graph.add_node("finalize", finalize)
graph.set_entry_point("research")
graph.add_edge("research", "finalize")
graph.add_edge("finalize", END)interrupt_before: Pause Before a Node Runs
Compile with interrupt_before to pause execution before the named node executes:
checkpointer = MemorySaver()
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["finalize"], # Pause before finalize runs
)
config = {"configurable": {"thread_id": "review_session_1"}}
# Run — will execute "research" and then pause before "finalize"
result = app.invoke({
"drug_name": "warfarin",
"draft_report": "",
"human_feedback": "",
"final_report": "",
"approved": False,
"steps": [],
}, config=config)
# The graph is now paused. result contains state up to the interrupt.
print(f"Draft report: {result['draft_report']}")
print(f"Steps so far: {result['steps']}")
# Output: Steps so far: ['Research completed']
# 'finalize' has NOT run yetInspecting State at the Interrupt
After the interrupt, examine state with app.get_state():
# Check what the graph is waiting on
snapshot = app.get_state(config)
print(f"Next node to run: {snapshot.next}")
# Output: ('finalize',)
print(f"Draft report:\n{snapshot.values['draft_report']}")
print(f"Human feedback so far: {snapshot.values['human_feedback']}")The snapshot.next tuple tells you which node will run when execution resumes.
Injecting Human Input with update_state
Before resuming, a human can modify state:
# Human reviews the draft and provides feedback
human_feedback = "Add contraindications for pregnancy. Emphasize INR monitoring."
# Inject the human's input into the checkpoint
app.update_state(
config,
{"human_feedback": human_feedback, "approved": True},
)
# Verify the update took effect
snapshot = app.get_state(config)
print(f"Feedback recorded: {snapshot.values['human_feedback']}")Resuming Execution
Pass None as input to resume from the last checkpoint:
# Resume from where the graph paused — finalize will now run with the updated state
final_result = app.invoke(None, config=config)
print(f"Final report: {final_result['final_report']}")
print(f"All steps: {final_result['steps']}")
# Output: ['Research completed', 'Report finalized']The graph picks up at finalize, which now has the human's feedback available in state.
interrupt_after: Pause After a Node Runs
interrupt_after pauses execution after the named node completes, before the next node starts:
app_after = graph.compile(
checkpointer=checkpointer,
interrupt_after=["research"], # Pause after research completes
)
config2 = {"configurable": {"thread_id": "review_session_2"}}
result = app_after.invoke({
"drug_name": "metformin",
"draft_report": "",
"human_feedback": "",
"final_report": "",
"approved": False,
"steps": [],
}, config=config2)
# research has already run — its output is available
print(f"Draft (already computed): {result['draft_report']}")
snapshot = app_after.get_state(config2)
print(f"Next: {snapshot.next}") # ('finalize',)Use interrupt_after when you want to review what a node produced (the output is already in state) before the next node consumes it.
Practical: Medical Report Approval Workflow
A complete approval workflow with conditional routing based on human decision:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated, Literal
import operator
class MedicalReviewState(TypedDict):
patient_id: str
drug_name: str
dose_recommendation: str
risk_flags: Annotated[list[str], operator.add]
pharmacist_decision: str # "approved", "rejected", "needs_revision"
pharmacist_notes: str
final_order: str
audit_log: Annotated[list[str], operator.add]
def analyze_drug(state: MedicalReviewState) -> dict:
"""AI analysis of the drug order."""
flags = []
if state["drug_name"] == "warfarin":
flags.append("Narrow therapeutic index — INR monitoring required")
flags.append("Multiple drug interactions")
return {
"dose_recommendation": f"Standard dosing for {state['drug_name']}: see protocol",
"risk_flags": flags,
"audit_log": [f"AI analysis completed for {state['drug_name']}"],
}
def apply_pharmacist_decision(state: MedicalReviewState) -> dict:
"""Apply the pharmacist's decision to the drug order."""
decision = state.get("pharmacist_decision", "")
if decision == "approved":
order = f"APPROVED: {state['dose_recommendation']}"
if state.get("pharmacist_notes"):
order += f"\nPharmacist notes: {state['pharmacist_notes']}"
elif decision == "rejected":
order = f"REJECTED by pharmacist: {state.get('pharmacist_notes', 'No reason given')}"
else:
order = f"PENDING REVISION: {state.get('pharmacist_notes', '')}"
return {
"final_order": order,
"audit_log": [f"Pharmacist decision: {decision}"],
}
def route_after_review(state: MedicalReviewState) -> Literal["apply_decision", "analyze_drug"]:
"""Route based on pharmacist's decision."""
decision = state.get("pharmacist_decision", "")
if decision == "needs_revision":
return "analyze_drug" # Send back for re-analysis
return "apply_decision"
builder = StateGraph(MedicalReviewState)
builder.add_node("analyze_drug", analyze_drug)
builder.add_node("apply_decision", apply_pharmacist_decision)
builder.set_entry_point("analyze_drug")
builder.add_conditional_edges(
"apply_decision" if False else "analyze_drug", # placeholder — see below
route_after_review,
{"apply_decision": "apply_decision", "analyze_drug": "analyze_drug"},
)
builder.add_edge("apply_decision", END)
# Pause after analysis, before applying decision
app = builder.compile(
checkpointer=MemorySaver(),
interrupt_after=["analyze_drug"],
)Running the workflow:
config = {"configurable": {"thread_id": f"order_{patient_id}"}}
# Step 1: AI analyzes the order
result = app.invoke({
"patient_id": "P-12345",
"drug_name": "warfarin",
"dose_recommendation": "",
"risk_flags": [],
"pharmacist_decision": "",
"pharmacist_notes": "",
"final_order": "",
"audit_log": [],
}, config=config)
# Step 2: Pharmacist reviews (happens outside the graph — in your UI, API, etc.)
print("Risk flags for pharmacist review:")
for flag in result["risk_flags"]:
print(f" - {flag}")
# Pharmacist enters decision via your application
pharmacist_input = {
"pharmacist_decision": "approved",
"pharmacist_notes": "Patient has stable INR history. Proceed with standard monitoring.",
}
app.update_state(config, pharmacist_input)
# Step 3: Resume — apply decision and finalize
final = app.invoke(None, config=config)
print(f"\nFinal order:\n{final['final_order']}")Handling Rejection and Revision Loops
When a human rejects an output, you can loop back:
# Pharmacist rejects with a note
app.update_state(config, {
"pharmacist_decision": "rejected",
"pharmacist_notes": "Dose too high for this patient's renal function. Reduce by 50%.",
})
# Graph routes to "rejected" path and ends
result = app.invoke(None, config=config)
print(result["final_order"])
# Output: REJECTED by pharmacist: Dose too high for this patient's renal function...
# For revision loops, set pharmacist_decision to "needs_revision"
# The conditional edge routes back to analyze_drug
# The agent re-runs with the feedback available in stateMultiple Interrupt Points
A graph can have multiple interrupt points for complex multi-stage review:
app_multi = graph.compile(
checkpointer=MemorySaver(),
interrupt_before=["safety_check", "finalize"],
)
# Execution pauses before safety_check — clinician reviews draft
# After approval, resumes until pausing before finalize — senior clinician signs off
# After second approval, resumes and finalizesBuilding a Human Review API
In production, HITL often works via an API:
from fastapi import FastAPI
from pydantic import BaseModel
api = FastAPI()
class StartRequest(BaseModel):
patient_id: str
drug_name: str
class ReviewDecision(BaseModel):
thread_id: str
decision: str
notes: str
@api.post("/orders/start")
def start_order(req: StartRequest):
thread_id = f"order_{req.patient_id}"
config = {"configurable": {"thread_id": thread_id}}
result = app.invoke({
"patient_id": req.patient_id,
"drug_name": req.drug_name,
# ... other initial fields
}, config=config)
return {
"thread_id": thread_id,
"status": "awaiting_review",
"risk_flags": result["risk_flags"],
"dose_recommendation": result["dose_recommendation"],
}
@api.post("/orders/review")
def submit_review(req: ReviewDecision):
config = {"configurable": {"thread_id": req.thread_id}}
app.update_state(config, {
"pharmacist_decision": req.decision,
"pharmacist_notes": req.notes,
})
final = app.invoke(None, config=config)
return {"status": "complete", "final_order": final["final_order"]}Key Rules
Always use a persistent checkpointer in production: MemorySaver loses state on restart. Use SqliteSaver or PostgresSaver for anything that needs to survive a server restart.
Thread IDs must be unique per workflow instance: Use f"order_{order_id}" or str(uuid4()), never a shared constant.
invoke(None, config=config) resumes — do not pass new state: The updated state is already in the checkpoint. Passing a new state dict starts a new run, not a resume.
Timeouts: If a human never reviews, the checkpoint sits indefinitely. Add a background job that checks for stale threads and escalates or expires them.