AI Systemsintermediate
AgentExecutor: Running Agents Safely
Configure AgentExecutor for production: iteration limits, error handling, streaming agent output, early stopping, verbose logging, and async execution.
Asma Hafeez KhanMay 16, 20265 min read
LangChainAgentExecutorAgentProductionSafetyError Handling
AgentExecutor Overview
AgentExecutor is the runtime that manages the agent loop. It handles:
- Executing tool calls made by the agent
- Passing tool results back to the agent
- Stopping conditions (iteration limit, time limit)
- Error handling and recovery
Python
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
@tool
def search_drug(query: str) -> str:
"""Search drug information database."""
return f"Drug info for: {query}"
tools = [search_drug]
model = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a clinical pharmacist. Use tools when needed."),
("placeholder", "{chat_history}"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(model, tools, prompt)
# Minimal executor
executor = AgentExecutor(agent=agent, tools=tools)
result = executor.invoke({"input": "What is warfarin?", "chat_history": []})Key Configuration Parameters
Python
executor = AgentExecutor(
agent=agent,
tools=tools,
# Safety limits
max_iterations=10, # Max tool calls before forcing stop
max_execution_time=60.0, # Max wall time in seconds
# Error handling
handle_parsing_errors=True, # Don't crash on malformed tool calls
# handle_parsing_errors="I encountered an error. Please rephrase." # Custom error message
# Stopping behavior
early_stopping_method="generate", # "generate": ask LLM for best answer when stopped
# "force": return raw last output when stopped
# Observability
verbose=True, # Log each thought and tool call to stdout
return_intermediate_steps=True, # Include tool call history in output
# Input/output keys
input_variables=["input", "chat_history"],
memory=None, # Optional: attach memory to executor
)Handling Tool Errors
Python
@tool
def risky_database_query(drug_name: str) -> str:
"""Query the drug database — may fail if drug not found."""
if drug_name.lower() not in ["warfarin", "metformin"]:
raise ValueError(f"Drug '{drug_name}' not in database")
return f"Data for {drug_name}"
# Option 1: handle_parsing_errors catches tool execution errors
executor_safe = AgentExecutor(
agent=agent,
tools=[risky_database_query],
handle_parsing_errors=True, # Tool errors become observations instead of exceptions
)
# Option 2: Wrap tool with error handling
@tool
def safe_drug_query(drug_name: str) -> str:
"""
Query the drug database. Returns an error message if the drug is not found
rather than raising an exception.
"""
try:
if drug_name.lower() not in ["warfarin", "metformin"]:
return f"Drug '{drug_name}' not found. Try a generic name or check spelling."
return f"Data for {drug_name}: [clinical details]"
except Exception as e:
return f"Query failed: {str(e)}. Please try again or consult reference directly."
# Option 3: with_fallbacks for entire agent
fallback_agent_executor = executor.with_fallbacks([
AgentExecutor(
agent=create_tool_calling_agent(
ChatOpenAI(model="gpt-4o-mini"), # Cheaper fallback
tools,
prompt,
),
tools=tools,
)
])Return Intermediate Steps
Python
executor_with_steps = AgentExecutor(
agent=agent,
tools=tools,
return_intermediate_steps=True,
verbose=False,
)
result = executor_with_steps.invoke({
"input": "What is the warfarin-aspirin interaction?",
"chat_history": [],
})
# result["output"] — final answer string
# result["intermediate_steps"] — list of (AgentAction, tool_output) tuples
print(f"Final answer: {result['output']}")
print(f"\nTools called:")
for action, observation in result["intermediate_steps"]:
print(f" Tool: {action.tool}")
print(f" Input: {action.tool_input}")
print(f" Output: {observation[:80]}")Streaming Agent Output
Stream the agent's responses token by token (useful for UI):
Python
# Method 1: stream() — yields events as they happen
for event in executor.stream(
{"input": "Explain warfarin mechanism and interactions.", "chat_history": []},
include_run_info=True,
):
# Event types:
# {"actions": [...]} — tool calls made
# {"steps": [...]} — tool results received
# {"output": "..."} — final answer (streaming not supported for final output)
if "actions" in event:
for action in event["actions"]:
print(f"\n[Tool] {action.tool}({action.tool_input})")
elif "steps" in event:
for step in event["steps"]:
print(f"[Result] {str(step.observation)[:100]}")
elif "output" in event:
print(f"\n[Answer] {event['output']}")
# Method 2: astream_events() — async, token-level streaming
import asyncio
async def stream_agent(question: str):
async for event in executor.astream_events(
{"input": question, "chat_history": []},
version="v2",
):
event_type = event.get("event")
if event_type == "on_chat_model_stream":
# Individual tokens from the LLM
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
elif event_type == "on_tool_start":
print(f"\n[Calling tool: {event['name']}]")
elif event_type == "on_tool_end":
print(f"[Tool result: {str(event['data']['output'])[:80]}]")
asyncio.run(stream_agent("What is the warfarin dose for AFib?"))Async Execution
Python
import asyncio
# Single async invocation
async def query_agent(question: str) -> str:
result = await executor.ainvoke({
"input": question,
"chat_history": [],
})
return result["output"]
# Multiple concurrent agent queries
async def batch_queries(questions: list[str]) -> list[str]:
tasks = [query_agent(q) for q in questions]
return await asyncio.gather(*tasks)
# Run for multiple clinical questions simultaneously
questions = [
"What is warfarin?",
"What is metformin?",
"What is lisinopril?",
]
answers = asyncio.run(batch_queries(questions))
for q, a in zip(questions, answers):
print(f"Q: {q}\nA: {a[:80]}\n")Executor with Memory
Python
from langchain.memory import ConversationBufferWindowMemory
# Attach memory to executor for multi-turn conversations
memory = ConversationBufferWindowMemory(
k=5,
memory_key="chat_history",
return_messages=True,
output_key="output",
)
executor_with_memory = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=False,
)
# Memory is automatically loaded and saved per turn
r1 = executor_with_memory.invoke({"input": "What is warfarin?"})
r2 = executor_with_memory.invoke({"input": "What dose for AFib?"}) # Has context
r3 = executor_with_memory.invoke({"input": "What are the main interactions?"}) # Has context
# Inspect memory
print(f"Turns stored: {len(memory.chat_memory.messages) // 2}")Production AgentExecutor Template
Python
from langchain_core.runnables import RunnableConfig
import logging
logger = logging.getLogger(__name__)
def create_production_executor(
model_name: str = "gpt-4o",
max_iterations: int = 8,
timeout_seconds: float = 45.0,
) -> AgentExecutor:
"""Create a production-grade AgentExecutor with all safety features."""
model = ChatOpenAI(model=model_name, temperature=0)
agent = create_tool_calling_agent(model, tools, prompt)
return AgentExecutor(
agent=agent,
tools=tools,
max_iterations=max_iterations,
max_execution_time=timeout_seconds,
handle_parsing_errors=True,
early_stopping_method="generate",
return_intermediate_steps=True,
verbose=False, # Use callbacks for production logging
)
def safe_agent_invoke(executor: AgentExecutor, question: str, session_id: str) -> dict:
"""Invoke agent with error handling and logging."""
import time
start = time.time()
try:
result = executor.invoke(
{"input": question, "chat_history": []},
config=RunnableConfig(
tags=["clinical", "pharmacist"],
metadata={"session_id": session_id},
),
)
latency = (time.time() - start) * 1000
logger.info(
"Agent invocation",
extra={
"session_id": session_id,
"latency_ms": latency,
"tool_calls": len(result.get("intermediate_steps", [])),
},
)
return {
"answer": result["output"],
"tool_calls_made": len(result.get("intermediate_steps", [])),
"latency_ms": latency,
"success": True,
}
except Exception as e:
logger.error(f"Agent failed for session {session_id}: {e}")
return {
"answer": "I encountered an error. Please try rephrasing your question.",
"error": str(e),
"success": False,
}Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.