Observability for Tool Calls
What to log, how to trace tool call chains with OpenTelemetry, which metrics to collect, and how to alert on tool anomalies in production AI agents.
Why Observability Is Different for AI Agents
Traditional services have deterministic call graphs. An AI agent's call graph is dynamic — which tools are called, how many times, in what order, and with what arguments all depend on the LLM's reasoning. You cannot predict it at deploy time.
This means:
- You cannot pre-define a fixed set of metrics (you don't know which tool calls to expect)
- Errors may be logical rather than exceptions (the tool ran, but the LLM called the wrong one)
- Debugging requires understanding the conversation state that led to a tool call, not just the call itself
- Anomalies include novel tool sequences — not just high error rates
Observability for AI agents needs to capture the full context: what the user asked, what the LLM decided, which tools were called, what they returned, and what the final answer was.
What to Log for Every Tool Call
Minimum required fields for every tool call event:
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Optional
import uuid
@dataclass
class ToolCallEvent:
# Identity
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
session_id: str = ""
user_id: str = ""
tool_call_id: str = "" # The ID from the OpenAI response
# What was called
tool_name: str = ""
argument_keys: list[str] = field(default_factory=list) # Keys only, not values (may be PII)
# Timing
started_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
latency_ms: float = 0.0
# Result
success: bool = True
error_message: Optional[str] = None
result_keys: list[str] = field(default_factory=list) # Keys only
result_size_bytes: int = 0
# Context
iteration: int = 0 # Which iteration of the agent loop
model: str = ""
agent_type: str = ""Structured Logging with structlog
import structlog
import json
import time
from typing import Callable, Any
log = structlog.get_logger("tool_calls")
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
def log_tool_execution(
tool_call,
result: dict,
latency_ms: float,
session_id: str,
user_id: str,
iteration: int
) -> None:
"""Emit a structured log event for a completed tool call."""
fn_args = json.loads(tool_call.function.arguments)
result_str = json.dumps(result)
log.info(
"tool_call_completed",
# Identity
tool_call_id=tool_call.id,
session_id=session_id,
user_id=user_id,
# What was called
tool_name=tool_call.function.name,
argument_keys=sorted(fn_args.keys()),
# Result
success=result.get("success", "error" not in result),
error=result.get("error"),
result_keys=sorted(result.keys()) if isinstance(result, dict) else [],
result_size_bytes=len(result_str.encode()),
# Performance
latency_ms=round(latency_ms, 1),
iteration=iteration,
)Tracing Tool Call Chains with OpenTelemetry
OpenTelemetry lets you visualize the full span tree of an agent interaction: user request → LLM call → tool calls → final response.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
import json
# Initialize tracing
resource = Resource.create({"service.name": "clinical-agent"})
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("clinical-agent")
def run_traced_agent(user_message: str, session_id: str, user_id: str) -> str:
import openai
client = openai.OpenAI()
with tracer.start_as_current_span("agent.run") as agent_span:
agent_span.set_attribute("session_id", session_id)
agent_span.set_attribute("user_id", user_id)
agent_span.set_attribute("user_message_length", len(user_message))
messages = [{"role": "user", "content": user_message}]
iteration = 0
for iteration in range(10):
with tracer.start_as_current_span(f"llm.call.{iteration}") as llm_span:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
tool_choice="auto"
)
msg = response.choices[0].message
llm_span.set_attribute("model", "gpt-4o")
llm_span.set_attribute("finish_reason", response.choices[0].finish_reason)
llm_span.set_attribute("tool_calls_requested", len(msg.tool_calls or []))
llm_span.set_attribute("prompt_tokens", response.usage.prompt_tokens)
llm_span.set_attribute("completion_tokens", response.usage.completion_tokens)
if not msg.tool_calls:
agent_span.set_attribute("iterations", iteration + 1)
return msg.content or ""
messages.append(msg)
for tc in msg.tool_calls:
fn_name = tc.function.name
fn_args = json.loads(tc.function.arguments)
with tracer.start_as_current_span(f"tool.{fn_name}") as tool_span:
tool_span.set_attribute("tool.name", fn_name)
tool_span.set_attribute("tool.call_id", tc.id)
tool_span.set_attribute("tool.args_keys", ",".join(sorted(fn_args.keys())))
import time
start = time.monotonic()
try:
result = TOOL_MAP[fn_name](**fn_args) if fn_name in TOOL_MAP else {"error": f"Unknown: {fn_name}"}
tool_span.set_attribute("tool.success", "error" not in result)
if "error" in result:
tool_span.set_attribute("tool.error", result["error"])
except Exception as e:
result = {"error": str(e)}
tool_span.record_exception(e)
tool_span.set_attribute("tool.success", False)
finally:
elapsed = (time.monotonic() - start) * 1000
tool_span.set_attribute("tool.latency_ms", round(elapsed, 1))
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": json.dumps(result)
})
agent_span.set_attribute("iterations", iteration + 1)
return "Max iterations reached."Metrics: What to Measure
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Metrics definitions
tool_calls_total = Counter(
"agent_tool_calls_total",
"Total number of tool calls",
["tool_name", "success"]
)
tool_call_latency = Histogram(
"agent_tool_call_latency_seconds",
"Tool call latency in seconds",
["tool_name"],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
tool_call_errors = Counter(
"agent_tool_call_errors_total",
"Total tool call errors by type",
["tool_name", "error_type"]
)
agent_iterations = Histogram(
"agent_iterations_per_request",
"Number of agent loop iterations per user request",
buckets=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
)
active_sessions = Gauge(
"agent_active_sessions",
"Number of currently active agent sessions"
)
def metered_execute_tool(tool_call, tool_map: dict) -> dict:
fn_name = tool_call.function.name
fn_args = json.loads(tool_call.function.arguments)
start = time.monotonic()
try:
result = tool_map[fn_name](**fn_args) if fn_name in tool_map else {"error": "Unknown tool"}
success = "error" not in result
error_type = result.get("error_type", "unknown") if not success else None
tool_calls_total.labels(tool_name=fn_name, success=str(success)).inc()
if not success:
tool_call_errors.labels(
tool_name=fn_name,
error_type=error_type or "unspecified"
).inc()
except Exception as e:
result = {"error": str(e)}
tool_calls_total.labels(tool_name=fn_name, success="False").inc()
tool_call_errors.labels(tool_name=fn_name, error_type=type(e).__name__).inc()
finally:
elapsed = time.monotonic() - start
tool_call_latency.labels(tool_name=fn_name).observe(elapsed)
return result
# Start Prometheus metrics server
start_http_server(8001) # Scrape at :8001/metricsKey Metrics Dashboard (Grafana Queries)
# Tool call rate per tool (calls per minute)
rate(agent_tool_calls_total[1m]) by (tool_name)
# Error rate per tool
rate(agent_tool_call_errors_total[5m]) by (tool_name, error_type)
# p95 latency per tool
histogram_quantile(0.95, rate(agent_tool_call_latency_seconds_bucket[5m])) by (tool_name)
# Tools with over 5% error rate (alert threshold)
rate(agent_tool_call_errors_total[5m]) by (tool_name)
/
rate(agent_tool_calls_total[5m]) by (tool_name)
> 0.05
# Unusual spike in a specific tool's call volume
delta(agent_tool_calls_total{tool_name="delete_patient_record"}[5m]) > 0Alerting: When to Page
# alerting-rules.yml (Prometheus AlertManager format)
groups:
- name: agent_tool_alerts
rules:
- alert: ToolErrorRateHigh
expr: |
rate(agent_tool_call_errors_total[5m]) by (tool_name)
/
rate(agent_tool_calls_total[5m]) by (tool_name)
> 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "Tool {{ $labels.tool_name }} error rate above 10%"
- alert: ToolLatencyHigh
expr: |
histogram_quantile(0.95, rate(agent_tool_call_latency_seconds_bucket[5m])) by (tool_name)
> 5
for: 3m
labels:
severity: warning
annotations:
summary: "Tool {{ $labels.tool_name }} p95 latency above 5 seconds"
- alert: WriteToolUnexpectedCallSpike
expr: |
rate(agent_tool_calls_total{tool_name=~"delete_.*|update_.*|create_.*"}[1m]) > 10
for: 1m
labels:
severity: critical
annotations:
summary: "Unusual spike in write tool calls — potential security event"Session Replay: Logging Full Conversations for Debugging
When something goes wrong, you need the full conversation context — not just the tool call.
import json
from datetime import datetime, timezone
class ConversationLogger:
def __init__(self, session_id: str, user_id: str):
self.session_id = session_id
self.user_id = user_id
self.events: list[dict] = []
def log_user_message(self, content: str) -> None:
self.events.append({
"type": "user_message",
"timestamp": datetime.now(timezone.utc).isoformat(),
"content_length": len(content),
"content_preview": content[:200]
})
def log_tool_call(self, tool_name: str, args_keys: list[str], result_summary: dict) -> None:
self.events.append({
"type": "tool_call",
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool_name": tool_name,
"args_keys": args_keys,
"result_summary": result_summary # Summarized, not raw (avoid logging PII)
})
def log_final_response(self, response_length: int, iterations: int) -> None:
self.events.append({
"type": "final_response",
"timestamp": datetime.now(timezone.utc).isoformat(),
"response_length": response_length,
"total_iterations": iterations
})
def persist(self) -> None:
"""Write the session log to durable storage."""
log.info(
"session_completed",
session_id=self.session_id,
user_id=self.user_id,
event_count=len(self.events),
events=self.events
)Summary
| What to Observe | How | |---|---| | Tool call events | structlog with tool_name, latency, success, error, session_id | | Distributed traces | OpenTelemetry spans: agent.run → llm.call → tool.name | | Metrics | Prometheus counters and histograms per tool | | Alerts | Error rate over 10%, latency over 5s, write tool spikes | | Session replay | Full conversation event log for debugging | | Token usage | Track prompt_tokens and completion_tokens per LLM call |
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.