Learnixo

LangChain Mastery · Lesson 23 of 33

AgentExecutor: Running Agents Safely

AgentExecutor Overview

AgentExecutor is the runtime that manages the agent loop. It handles:

  • Executing tool calls made by the agent
  • Passing tool results back to the agent
  • Stopping conditions (iteration limit, time limit)
  • Error handling and recovery
Python
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool

@tool
def search_drug(query: str) -> str:
    """Search drug information database."""
    return f"Drug info for: {query}"

tools = [search_drug]
model = ChatOpenAI(model="gpt-4o", temperature=0)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a clinical pharmacist. Use tools when needed."),
    ("placeholder", "{chat_history}"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

agent = create_tool_calling_agent(model, tools, prompt)

# Minimal executor
executor = AgentExecutor(agent=agent, tools=tools)
result = executor.invoke({"input": "What is warfarin?", "chat_history": []})

Key Configuration Parameters

Python
executor = AgentExecutor(
    agent=agent,
    tools=tools,

    # Safety limits
    max_iterations=10,           # Max tool calls before forcing stop
    max_execution_time=60.0,     # Max wall time in seconds

    # Error handling
    handle_parsing_errors=True,  # Don't crash on malformed tool calls
    # handle_parsing_errors="I encountered an error. Please rephrase."  # Custom error message

    # Stopping behavior
    early_stopping_method="generate",   # "generate": ask LLM for best answer when stopped
                                         # "force": return raw last output when stopped

    # Observability
    verbose=True,   # Log each thought and tool call to stdout
    return_intermediate_steps=True,   # Include tool call history in output

    # Input/output keys
    input_variables=["input", "chat_history"],
    memory=None,   # Optional: attach memory to executor
)

Handling Tool Errors

Python
@tool
def risky_database_query(drug_name: str) -> str:
    """Query the drug database — may fail if drug not found."""
    if drug_name.lower() not in ["warfarin", "metformin"]:
        raise ValueError(f"Drug '{drug_name}' not in database")
    return f"Data for {drug_name}"


# Option 1: handle_parsing_errors catches tool execution errors
executor_safe = AgentExecutor(
    agent=agent,
    tools=[risky_database_query],
    handle_parsing_errors=True,   # Tool errors become observations instead of exceptions
)

# Option 2: Wrap tool with error handling
@tool
def safe_drug_query(drug_name: str) -> str:
    """
    Query the drug database. Returns an error message if the drug is not found
    rather than raising an exception.
    """
    try:
        if drug_name.lower() not in ["warfarin", "metformin"]:
            return f"Drug '{drug_name}' not found. Try a generic name or check spelling."
        return f"Data for {drug_name}: [clinical details]"
    except Exception as e:
        return f"Query failed: {str(e)}. Please try again or consult reference directly."


# Option 3: with_fallbacks for entire agent
fallback_agent_executor = executor.with_fallbacks([
    AgentExecutor(
        agent=create_tool_calling_agent(
            ChatOpenAI(model="gpt-4o-mini"),  # Cheaper fallback
            tools,
            prompt,
        ),
        tools=tools,
    )
])

Return Intermediate Steps

Python
executor_with_steps = AgentExecutor(
    agent=agent,
    tools=tools,
    return_intermediate_steps=True,
    verbose=False,
)

result = executor_with_steps.invoke({
    "input": "What is the warfarin-aspirin interaction?",
    "chat_history": [],
})

# result["output"]  final answer string
# result["intermediate_steps"]  list of (AgentAction, tool_output) tuples
print(f"Final answer: {result['output']}")
print(f"\nTools called:")
for action, observation in result["intermediate_steps"]:
    print(f"  Tool: {action.tool}")
    print(f"  Input: {action.tool_input}")
    print(f"  Output: {observation[:80]}")

Streaming Agent Output

Stream the agent's responses token by token (useful for UI):

Python
# Method 1: stream()  yields events as they happen
for event in executor.stream(
    {"input": "Explain warfarin mechanism and interactions.", "chat_history": []},
    include_run_info=True,
):
    # Event types:
    # {"actions": [...]}  tool calls made
    # {"steps": [...]}  tool results received
    # {"output": "..."}  final answer (streaming not supported for final output)

    if "actions" in event:
        for action in event["actions"]:
            print(f"\n[Tool] {action.tool}({action.tool_input})")
    elif "steps" in event:
        for step in event["steps"]:
            print(f"[Result] {str(step.observation)[:100]}")
    elif "output" in event:
        print(f"\n[Answer] {event['output']}")


# Method 2: astream_events()  async, token-level streaming
import asyncio

async def stream_agent(question: str):
    async for event in executor.astream_events(
        {"input": question, "chat_history": []},
        version="v2",
    ):
        event_type = event.get("event")
        if event_type == "on_chat_model_stream":
            # Individual tokens from the LLM
            chunk = event["data"]["chunk"]
            if hasattr(chunk, "content") and chunk.content:
                print(chunk.content, end="", flush=True)
        elif event_type == "on_tool_start":
            print(f"\n[Calling tool: {event['name']}]")
        elif event_type == "on_tool_end":
            print(f"[Tool result: {str(event['data']['output'])[:80]}]")

asyncio.run(stream_agent("What is the warfarin dose for AFib?"))

Async Execution

Python
import asyncio

# Single async invocation
async def query_agent(question: str) -> str:
    result = await executor.ainvoke({
        "input": question,
        "chat_history": [],
    })
    return result["output"]

# Multiple concurrent agent queries
async def batch_queries(questions: list[str]) -> list[str]:
    tasks = [query_agent(q) for q in questions]
    return await asyncio.gather(*tasks)

# Run for multiple clinical questions simultaneously
questions = [
    "What is warfarin?",
    "What is metformin?",
    "What is lisinopril?",
]
answers = asyncio.run(batch_queries(questions))
for q, a in zip(questions, answers):
    print(f"Q: {q}\nA: {a[:80]}\n")

Executor with Memory

Python
from langchain.memory import ConversationBufferWindowMemory

# Attach memory to executor for multi-turn conversations
memory = ConversationBufferWindowMemory(
    k=5,
    memory_key="chat_history",
    return_messages=True,
    output_key="output",
)

executor_with_memory = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=memory,
    verbose=False,
)

# Memory is automatically loaded and saved per turn
r1 = executor_with_memory.invoke({"input": "What is warfarin?"})
r2 = executor_with_memory.invoke({"input": "What dose for AFib?"})  # Has context
r3 = executor_with_memory.invoke({"input": "What are the main interactions?"})  # Has context

# Inspect memory
print(f"Turns stored: {len(memory.chat_memory.messages) // 2}")

Production AgentExecutor Template

Python
from langchain_core.runnables import RunnableConfig
import logging

logger = logging.getLogger(__name__)


def create_production_executor(
    model_name: str = "gpt-4o",
    max_iterations: int = 8,
    timeout_seconds: float = 45.0,
) -> AgentExecutor:
    """Create a production-grade AgentExecutor with all safety features."""
    model = ChatOpenAI(model=model_name, temperature=0)

    agent = create_tool_calling_agent(model, tools, prompt)

    return AgentExecutor(
        agent=agent,
        tools=tools,
        max_iterations=max_iterations,
        max_execution_time=timeout_seconds,
        handle_parsing_errors=True,
        early_stopping_method="generate",
        return_intermediate_steps=True,
        verbose=False,  # Use callbacks for production logging
    )


def safe_agent_invoke(executor: AgentExecutor, question: str, session_id: str) -> dict:
    """Invoke agent with error handling and logging."""
    import time

    start = time.time()
    try:
        result = executor.invoke(
            {"input": question, "chat_history": []},
            config=RunnableConfig(
                tags=["clinical", "pharmacist"],
                metadata={"session_id": session_id},
            ),
        )
        latency = (time.time() - start) * 1000
        logger.info(
            "Agent invocation",
            extra={
                "session_id": session_id,
                "latency_ms": latency,
                "tool_calls": len(result.get("intermediate_steps", [])),
            },
        )
        return {
            "answer": result["output"],
            "tool_calls_made": len(result.get("intermediate_steps", [])),
            "latency_ms": latency,
            "success": True,
        }
    except Exception as e:
        logger.error(f"Agent failed for session {session_id}: {e}")
        return {
            "answer": "I encountered an error. Please try rephrasing your question.",
            "error": str(e),
            "success": False,
        }