LangChain Mastery · Lesson 23 of 33
AgentExecutor: Running Agents Safely
AgentExecutor Overview
AgentExecutor is the runtime that manages the agent loop. It handles:
- Executing tool calls made by the agent
- Passing tool results back to the agent
- Stopping conditions (iteration limit, time limit)
- Error handling and recovery
Python
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
@tool
def search_drug(query: str) -> str:
"""Search drug information database."""
return f"Drug info for: {query}"
tools = [search_drug]
model = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a clinical pharmacist. Use tools when needed."),
("placeholder", "{chat_history}"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(model, tools, prompt)
# Minimal executor
executor = AgentExecutor(agent=agent, tools=tools)
result = executor.invoke({"input": "What is warfarin?", "chat_history": []})Key Configuration Parameters
Python
executor = AgentExecutor(
agent=agent,
tools=tools,
# Safety limits
max_iterations=10, # Max tool calls before forcing stop
max_execution_time=60.0, # Max wall time in seconds
# Error handling
handle_parsing_errors=True, # Don't crash on malformed tool calls
# handle_parsing_errors="I encountered an error. Please rephrase." # Custom error message
# Stopping behavior
early_stopping_method="generate", # "generate": ask LLM for best answer when stopped
# "force": return raw last output when stopped
# Observability
verbose=True, # Log each thought and tool call to stdout
return_intermediate_steps=True, # Include tool call history in output
# Input/output keys
input_variables=["input", "chat_history"],
memory=None, # Optional: attach memory to executor
)Handling Tool Errors
Python
@tool
def risky_database_query(drug_name: str) -> str:
"""Query the drug database — may fail if drug not found."""
if drug_name.lower() not in ["warfarin", "metformin"]:
raise ValueError(f"Drug '{drug_name}' not in database")
return f"Data for {drug_name}"
# Option 1: handle_parsing_errors catches tool execution errors
executor_safe = AgentExecutor(
agent=agent,
tools=[risky_database_query],
handle_parsing_errors=True, # Tool errors become observations instead of exceptions
)
# Option 2: Wrap tool with error handling
@tool
def safe_drug_query(drug_name: str) -> str:
"""
Query the drug database. Returns an error message if the drug is not found
rather than raising an exception.
"""
try:
if drug_name.lower() not in ["warfarin", "metformin"]:
return f"Drug '{drug_name}' not found. Try a generic name or check spelling."
return f"Data for {drug_name}: [clinical details]"
except Exception as e:
return f"Query failed: {str(e)}. Please try again or consult reference directly."
# Option 3: with_fallbacks for entire agent
fallback_agent_executor = executor.with_fallbacks([
AgentExecutor(
agent=create_tool_calling_agent(
ChatOpenAI(model="gpt-4o-mini"), # Cheaper fallback
tools,
prompt,
),
tools=tools,
)
])Return Intermediate Steps
Python
executor_with_steps = AgentExecutor(
agent=agent,
tools=tools,
return_intermediate_steps=True,
verbose=False,
)
result = executor_with_steps.invoke({
"input": "What is the warfarin-aspirin interaction?",
"chat_history": [],
})
# result["output"] — final answer string
# result["intermediate_steps"] — list of (AgentAction, tool_output) tuples
print(f"Final answer: {result['output']}")
print(f"\nTools called:")
for action, observation in result["intermediate_steps"]:
print(f" Tool: {action.tool}")
print(f" Input: {action.tool_input}")
print(f" Output: {observation[:80]}")Streaming Agent Output
Stream the agent's responses token by token (useful for UI):
Python
# Method 1: stream() — yields events as they happen
for event in executor.stream(
{"input": "Explain warfarin mechanism and interactions.", "chat_history": []},
include_run_info=True,
):
# Event types:
# {"actions": [...]} — tool calls made
# {"steps": [...]} — tool results received
# {"output": "..."} — final answer (streaming not supported for final output)
if "actions" in event:
for action in event["actions"]:
print(f"\n[Tool] {action.tool}({action.tool_input})")
elif "steps" in event:
for step in event["steps"]:
print(f"[Result] {str(step.observation)[:100]}")
elif "output" in event:
print(f"\n[Answer] {event['output']}")
# Method 2: astream_events() — async, token-level streaming
import asyncio
async def stream_agent(question: str):
async for event in executor.astream_events(
{"input": question, "chat_history": []},
version="v2",
):
event_type = event.get("event")
if event_type == "on_chat_model_stream":
# Individual tokens from the LLM
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
elif event_type == "on_tool_start":
print(f"\n[Calling tool: {event['name']}]")
elif event_type == "on_tool_end":
print(f"[Tool result: {str(event['data']['output'])[:80]}]")
asyncio.run(stream_agent("What is the warfarin dose for AFib?"))Async Execution
Python
import asyncio
# Single async invocation
async def query_agent(question: str) -> str:
result = await executor.ainvoke({
"input": question,
"chat_history": [],
})
return result["output"]
# Multiple concurrent agent queries
async def batch_queries(questions: list[str]) -> list[str]:
tasks = [query_agent(q) for q in questions]
return await asyncio.gather(*tasks)
# Run for multiple clinical questions simultaneously
questions = [
"What is warfarin?",
"What is metformin?",
"What is lisinopril?",
]
answers = asyncio.run(batch_queries(questions))
for q, a in zip(questions, answers):
print(f"Q: {q}\nA: {a[:80]}\n")Executor with Memory
Python
from langchain.memory import ConversationBufferWindowMemory
# Attach memory to executor for multi-turn conversations
memory = ConversationBufferWindowMemory(
k=5,
memory_key="chat_history",
return_messages=True,
output_key="output",
)
executor_with_memory = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=False,
)
# Memory is automatically loaded and saved per turn
r1 = executor_with_memory.invoke({"input": "What is warfarin?"})
r2 = executor_with_memory.invoke({"input": "What dose for AFib?"}) # Has context
r3 = executor_with_memory.invoke({"input": "What are the main interactions?"}) # Has context
# Inspect memory
print(f"Turns stored: {len(memory.chat_memory.messages) // 2}")Production AgentExecutor Template
Python
from langchain_core.runnables import RunnableConfig
import logging
logger = logging.getLogger(__name__)
def create_production_executor(
model_name: str = "gpt-4o",
max_iterations: int = 8,
timeout_seconds: float = 45.0,
) -> AgentExecutor:
"""Create a production-grade AgentExecutor with all safety features."""
model = ChatOpenAI(model=model_name, temperature=0)
agent = create_tool_calling_agent(model, tools, prompt)
return AgentExecutor(
agent=agent,
tools=tools,
max_iterations=max_iterations,
max_execution_time=timeout_seconds,
handle_parsing_errors=True,
early_stopping_method="generate",
return_intermediate_steps=True,
verbose=False, # Use callbacks for production logging
)
def safe_agent_invoke(executor: AgentExecutor, question: str, session_id: str) -> dict:
"""Invoke agent with error handling and logging."""
import time
start = time.time()
try:
result = executor.invoke(
{"input": question, "chat_history": []},
config=RunnableConfig(
tags=["clinical", "pharmacist"],
metadata={"session_id": session_id},
),
)
latency = (time.time() - start) * 1000
logger.info(
"Agent invocation",
extra={
"session_id": session_id,
"latency_ms": latency,
"tool_calls": len(result.get("intermediate_steps", [])),
},
)
return {
"answer": result["output"],
"tool_calls_made": len(result.get("intermediate_steps", [])),
"latency_ms": latency,
"success": True,
}
except Exception as e:
logger.error(f"Agent failed for session {session_id}: {e}")
return {
"answer": "I encountered an error. Please try rephrasing your question.",
"error": str(e),
"success": False,
}