Parallel Tool Calls
When the LLM requests multiple tools in one response, run them concurrently with asyncio.gather() to cut latency. Learn the complete pattern with real examples.
What Parallel Tool Calls Are
When you send a query that requires multiple independent pieces of information, modern OpenAI models will often return multiple tool_calls in a single assistant message ā rather than calling one tool, waiting for the result, then calling another.
# Single tool call response
msg.tool_calls = [
ToolCall(id="call_abc", function=Function(name="get_patient_record", ...))
]
# Parallel tool call response
msg.tool_calls = [
ToolCall(id="call_abc", function=Function(name="get_patient_demographics", ...)),
ToolCall(id="call_def", function=Function(name="get_patient_medications", ...)),
ToolCall(id="call_ghi", function=Function(name="get_patient_allergies", ...))
]The LLM determined that all three queries are independent ā knowing demographics doesn't change what medications are returned. It's more efficient to request them all at once.
Sequential vs Parallel: The Latency Difference
With sequential execution:
Call 1: get_patient_demographics ā 200ms
Call 2: get_patient_medications ā 180ms
Call 3: get_patient_allergies ā 150ms
Total: 530msWith parallel execution using asyncio.gather():
All three calls start simultaneously
Total: max(200ms, 180ms, 150ms) = 200msFor a clinical dashboard that calls 5 tools, parallel execution often reduces total latency from over 1 second to under 300ms.
The Basic Pattern
import asyncio
import json
import openai
from openai.types.chat import ChatCompletionMessage
client = openai.OpenAI()
# Async tool functions
async def get_patient_demographics(patient_id: str) -> dict:
await asyncio.sleep(0.2) # Simulates DB latency
return {
"patient_id": patient_id,
"name": "Jane Doe",
"dob": "1975-03-14",
"gender": "female"
}
async def get_patient_medications(patient_id: str) -> dict:
await asyncio.sleep(0.18)
return {
"patient_id": patient_id,
"medications": ["Metformin 500mg", "Lisinopril 10mg", "Atorvastatin 20mg"]
}
async def get_patient_allergies(patient_id: str) -> dict:
await asyncio.sleep(0.15)
return {
"patient_id": patient_id,
"allergies": ["Penicillin", "Sulfonamides"]
}
ASYNC_TOOL_MAP = {
"get_patient_demographics": get_patient_demographics,
"get_patient_medications": get_patient_medications,
"get_patient_allergies": get_patient_allergies,
}
async def execute_tool_call(tool_call) -> dict:
"""Execute a single tool call and return (tool_call_id, result)."""
fn_name = tool_call.function.name
fn_args = json.loads(tool_call.function.arguments)
if fn_name not in ASYNC_TOOL_MAP:
result = {"error": f"Unknown tool: {fn_name}"}
else:
try:
result = await ASYNC_TOOL_MAP[fn_name](**fn_args)
except Exception as e:
result = {"error": str(e), "tool": fn_name}
return tool_call.id, result
async def run_parallel_agent(user_message: str, tools: list) -> str:
messages = [{"role": "user", "content": user_message}]
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
tool_choice="auto"
)
msg = response.choices[0].message
if not msg.tool_calls:
return msg.content or ""
messages.append(msg)
# Execute ALL tool calls concurrently
import time
start = time.monotonic()
results = await asyncio.gather(
*[execute_tool_call(tc) for tc in msg.tool_calls]
)
elapsed = time.monotonic() - start
print(f"Executed {len(msg.tool_calls)} tool calls in {elapsed:.3f}s (parallel)")
# Append all results in the same order as the tool_calls
for tool_call_id, result in results:
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": json.dumps(result)
})
final = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools
)
return final.choices[0].message.contentFull Working Example: Patient Summary Dashboard
import asyncio
import json
import time
import openai
import asyncpg
client = openai.OpenAI()
DB_DSN = "postgresql://user:pass@localhost:5432/hospital"
# Tool schemas
tools = [
{
"type": "function",
"function": {
"name": "get_patient_demographics",
"description": "Get basic patient demographics: name, DOB, contact info.",
"parameters": {
"type": "object",
"properties": {
"patient_id": {"type": "string", "description": "Patient ID (format: P-NNNNN)"}
},
"required": ["patient_id"]
}
}
},
{
"type": "function",
"function": {
"name": "get_patient_medications",
"description": "Get the current active medication list for a patient.",
"parameters": {
"type": "object",
"properties": {
"patient_id": {"type": "string", "description": "Patient ID"}
},
"required": ["patient_id"]
}
}
},
{
"type": "function",
"function": {
"name": "get_patient_allergies",
"description": "Get documented drug allergies and sensitivities for a patient.",
"parameters": {
"type": "object",
"properties": {
"patient_id": {"type": "string", "description": "Patient ID"}
},
"required": ["patient_id"]
}
}
},
{
"type": "function",
"function": {
"name": "get_recent_lab_results",
"description": "Get the most recent lab results for a patient.",
"parameters": {
"type": "object",
"properties": {
"patient_id": {"type": "string", "description": "Patient ID"},
"limit": {"type": "integer", "description": "Number of results. Default 5."}
},
"required": ["patient_id"]
}
}
}
]
# Async tool implementations
async def get_patient_demographics(patient_id: str) -> dict:
conn = await asyncpg.connect(DB_DSN)
try:
row = await conn.fetchrow(
"SELECT name, dob, phone, email FROM patients WHERE patient_id = $1",
patient_id
)
if not row:
return {"error": f"Patient {patient_id} not found"}
return {
"patient_id": patient_id,
"name": row["name"],
"dob": str(row["dob"]),
"phone": row["phone"],
"email": row["email"]
}
finally:
await conn.close()
async def get_patient_medications(patient_id: str) -> dict:
conn = await asyncpg.connect(DB_DSN)
try:
rows = await conn.fetch(
"""
SELECT drug_name, dose, frequency, route, start_date
FROM prescriptions
WHERE patient_id = $1 AND status = 'active'
ORDER BY start_date DESC
""",
patient_id
)
return {
"patient_id": patient_id,
"medications": [
{
"drug": r["drug_name"],
"dose": r["dose"],
"frequency": r["frequency"],
"route": r["route"]
}
for r in rows
],
"count": len(rows)
}
finally:
await conn.close()
async def get_patient_allergies(patient_id: str) -> dict:
conn = await asyncpg.connect(DB_DSN)
try:
rows = await conn.fetch(
"SELECT allergen, reaction, severity FROM allergies WHERE patient_id = $1",
patient_id
)
return {
"patient_id": patient_id,
"allergies": [
{"allergen": r["allergen"], "reaction": r["reaction"], "severity": r["severity"]}
for r in rows
]
}
finally:
await conn.close()
async def get_recent_lab_results(patient_id: str, limit: int = 5) -> dict:
conn = await asyncpg.connect(DB_DSN)
try:
rows = await conn.fetch(
"""
SELECT test_name, result, unit, reference_range, collected_at, flag
FROM lab_results
WHERE patient_id = $1
ORDER BY collected_at DESC
LIMIT $2
""",
patient_id,
limit
)
return {
"patient_id": patient_id,
"lab_results": [
{
"test": r["test_name"],
"result": r["result"],
"unit": r["unit"],
"reference_range": r["reference_range"],
"collected": str(r["collected_at"]),
"flag": r["flag"]
}
for r in rows
]
}
finally:
await conn.close()
ASYNC_TOOL_MAP = {
"get_patient_demographics": get_patient_demographics,
"get_patient_medications": get_patient_medications,
"get_patient_allergies": get_patient_allergies,
"get_recent_lab_results": get_recent_lab_results,
}
async def execute_tool_call(tool_call) -> tuple[str, dict]:
fn_name = tool_call.function.name
fn_args = json.loads(tool_call.function.arguments)
if fn_name not in ASYNC_TOOL_MAP:
return tool_call.id, {"error": f"Unknown tool: {fn_name}"}
try:
result = await ASYNC_TOOL_MAP[fn_name](**fn_args)
except Exception as e:
result = {"error": str(e), "tool": fn_name}
return tool_call.id, result
async def generate_patient_summary(patient_id: str) -> str:
"""Generate a full patient summary using parallel tool calls."""
user_message = (
f"Generate a complete clinical summary for patient {patient_id}. "
"Include demographics, current medications, allergies, and recent lab results."
)
messages = [
{
"role": "system",
"content": (
"You are a clinical summarization assistant. "
"When generating a patient summary, retrieve all relevant information "
"using available tools. Retrieve independent data concurrently."
)
},
{"role": "user", "content": user_message}
]
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
tool_choice="auto"
)
msg = response.choices[0].message
if not msg.tool_calls:
return msg.content or ""
print(f"LLM requested {len(msg.tool_calls)} tool calls:")
for tc in msg.tool_calls:
print(f" - {tc.function.name}({tc.function.arguments})")
messages.append(msg)
# Execute all concurrently
start = time.monotonic()
results = await asyncio.gather(*[execute_tool_call(tc) for tc in msg.tool_calls])
elapsed = time.monotonic() - start
print(f"All tools completed in {elapsed:.3f}s")
for tool_call_id, result in results:
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": json.dumps(result, default=str)
})
final = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools
)
return final.choices[0].message.content
# Run it
summary = asyncio.run(generate_patient_summary("P-00123"))
print(summary)Result Order Must Match tool_call Order
When appending tool results, the tool_call_id links each result to the right request. Order in the message list doesn't technically matter as long as IDs match, but keeping them in the same order as the tool_calls makes debugging easier.
# asyncio.gather preserves order ā results[0] corresponds to tool_calls[0]
results = await asyncio.gather(*[execute_tool_call(tc) for tc in msg.tool_calls])
# This loop is safe because gather preserves input order
for i, (tool_call_id, result) in enumerate(results):
assert tool_call_id == msg.tool_calls[i].id # Always true with gather
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": json.dumps(result)
})When to Encourage Parallel Calls
The LLM decides to use parallel tool calls based on whether it determines the queries are independent. You can encourage this in your system prompt:
system_prompt = """
You are a clinical dashboard assistant.
When generating patient summaries or responding to queries that require multiple
independent data points, retrieve all relevant information in parallel by including
multiple tool calls in your response. Do not wait for one result before requesting
another unless the second request depends on data from the first.
Example of independent parallel calls:
- Patient demographics + medications + allergies (all independent, use parallel)
Example of dependent sequential calls:
- Search for a drug ID, then use that ID to get dosage (sequential ā second depends on first)
"""Handling Partial Failures in Parallel Calls
When one tool fails during asyncio.gather, you have two options:
# Option 1: return_exceptions=True ā gather doesn't stop on failure
results = await asyncio.gather(
*[execute_tool_call(tc) for tc in msg.tool_calls],
return_exceptions=True # Each result may be an Exception
)
for i, result in enumerate(results):
tc = msg.tool_calls[i]
if isinstance(result, Exception):
tool_result = {"error": str(result), "tool": tc.function.name}
else:
_, tool_result = result
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": json.dumps(tool_result)
})Using return_exceptions=True ensures all tool results (including failures) are returned to the LLM, which can then decide how to handle partial data.
Summary
| Concept | Detail |
|---|---|
| Parallel tool calls | LLM returns multiple tool_calls in one assistant message |
| How to execute | asyncio.gather(*[execute_tool_call(tc) for tc in msg.tool_calls]) |
| Order preservation | asyncio.gather preserves input order ā use index to match results to tool_calls |
| Partial failure | Use return_exceptions=True, convert Exception to error dict |
| When to encourage | System prompt: retrieve independent data concurrently |
| Latency benefit | Total time = max(individual tool times) instead of sum |
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.