Async/Await in FastAPI
Master Python's async/await model for FastAPI routes. Learn when to use async def vs def, how to await OpenAI calls, run parallel tasks with asyncio.gather, and safely offload blocking code.
The Event Loop Model
Python's asyncio library implements a cooperative multitasking model. A single thread runs an event loop — a scheduler that decides which coroutine gets CPU time next.
A coroutine is a function declared with async def. It can pause itself at an await expression, yielding control back to the event loop. The event loop then runs another coroutine until it too hits an await, and so on.
import asyncio
async def fetch_data():
print("start fetch")
await asyncio.sleep(1) # pauses here — another coroutine can run
print("data ready")
return {"items": [1, 2, 3]}
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())The key insight: await asyncio.sleep(1) does not block the thread for one second. It tells the event loop "I am done for now, wake me up in one second". During that second the event loop can handle hundreds of other coroutines.
Contrast that with time.sleep(1) — a blocking call that freezes the entire thread and every other request in the same process.
async def vs def in FastAPI Routes
FastAPI supports both synchronous and asynchronous route handlers.
When to use async def
Use async def when your handler needs to await something:
from fastapi import FastAPI
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
@app.post("/chat")
async def chat(prompt: str) -> dict:
# This suspends the handler while waiting for the API response
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
return {"reply": response.choices[0].message.content}Awaitable calls include:
asyncdatabase drivers (asyncpg, motor, aiosqlite)httpx.AsyncClientHTTP calls- OpenAI SDK async client
- Redis async client (redis.asyncio)
- Any function declared with
async def
When to use def
Use plain def when your handler only does CPU work or calls synchronous libraries you cannot control:
@app.get("/compute")
def heavy_math(n: int) -> dict:
# Pure CPU work — no I/O waiting
result = sum(i * i for i in range(n))
return {"result": result}FastAPI runs synchronous handlers in a thread pool using anyio.to_thread.run_sync() automatically. This means a slow synchronous handler won't block the event loop — FastAPI handles the thread-pooling for you.
The mistake to avoid: declaring async def but then calling blocking code inside it.
# BAD — blocks the event loop
@app.get("/bad")
async def bad_route():
import time
time.sleep(5) # Freezes every other request for 5 seconds!
return {"done": True}
# GOOD — use synchronous def and let FastAPI thread-pool it
@app.get("/good-sync")
def good_sync_route():
import time
time.sleep(5) # Runs in thread pool, event loop stays free
return {"done": True}
# ALSO GOOD — use asyncio.to_thread inside async def
@app.get("/good-async")
async def good_async_route():
import asyncio, time
await asyncio.to_thread(time.sleep, 5)
return {"done": True}Common Async Patterns
Pattern 1: Awaiting a Database Query
import asyncpg
from fastapi import FastAPI, Depends
app = FastAPI()
async def get_db():
conn = await asyncpg.connect(dsn="postgresql://user:pass@localhost/mydb")
try:
yield conn
finally:
await conn.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(get_db)) -> dict:
row = await db.fetchrow("SELECT id, name, email FROM users WHERE id = $1", user_id)
if row is None:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="User not found")
return dict(row)Pattern 2: Awaiting an OpenAI Chat Completion
from openai import AsyncOpenAI
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
client = AsyncOpenAI() # Uses OPENAI_API_KEY from environment
class ChatRequest(BaseModel):
system_prompt: str = "You are a helpful assistant."
user_message: str
class ChatResponse(BaseModel):
content: str
model: str
prompt_tokens: int
completion_tokens: int
@app.post("/chat", response_model=ChatResponse)
async def chat(req: ChatRequest) -> ChatResponse:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": req.system_prompt},
{"role": "user", "content": req.user_message},
],
temperature=0.7,
)
choice = response.choices[0]
usage = response.usage
return ChatResponse(
content=choice.message.content,
model=response.model,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens,
)Pattern 3: Awaiting an Embedding Call
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def embed_text(text: str) -> list[float]:
"""Return a 1536-dimension embedding vector for the given text."""
response = await client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embeddingasyncio.gather: Running Async Calls in Parallel
asyncio.gather() schedules multiple coroutines concurrently on the same event loop. Instead of awaiting them one at a time (serial), they all start immediately and you wait for all of them to finish.
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def classify_text(text: str) -> str:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Classify the sentiment: positive, negative, or neutral."},
{"role": "user", "content": text},
],
max_tokens=10,
)
return response.choices[0].message.content.strip()
@app.post("/classify-batch")
async def classify_batch(texts: list[str]) -> list[str]:
"""Classify multiple texts in parallel — much faster than serial."""
# Serial approach: total time = sum of each call's latency
# Parallel approach: total time ≈ max single call's latency
results = await asyncio.gather(
*[classify_text(t) for t in texts]
)
return list(results)gather with error handling
By default, if one coroutine in gather raises an exception, the whole gather raises immediately. Use return_exceptions=True to collect errors alongside results:
async def safe_gather_example(texts: list[str]) -> list[str | Exception]:
results = await asyncio.gather(
*[classify_text(t) for t in texts],
return_exceptions=True,
)
output = []
for i, result in enumerate(results):
if isinstance(result, Exception):
output.append(f"ERROR for text {i}: {result}")
else:
output.append(result)
return outputgather with timeouts
Wrap gather in asyncio.wait_for to enforce an overall deadline:
try:
results = await asyncio.wait_for(
asyncio.gather(*[classify_text(t) for t in texts]),
timeout=15.0,
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Classification timed out")Blocking Code in Async Context: asyncio.to_thread()
Sometimes you must call a synchronous library that you cannot swap out — a legacy database driver, a PDF parser, a local ML model that doesn't have an async API.
asyncio.to_thread() runs the blocking call in a thread pool, allowing the event loop to keep processing other requests while it waits.
import asyncio
import pdfplumber # synchronous PDF library
async def extract_pdf_text(path: str) -> str:
def _extract():
with pdfplumber.open(path) as pdf:
return "\n".join(page.extract_text() or "" for page in pdf.pages)
# Run the blocking _extract() in a thread so the event loop stays free
text = await asyncio.to_thread(_extract)
return text
@app.post("/analyse-pdf")
async def analyse_pdf(pdf_path: str) -> dict:
text = await extract_pdf_text(pdf_path)
summary = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Summarise the document in 3 bullet points."},
{"role": "user", "content": text[:8000]},
],
)
return {"summary": summary.choices[0].message.content}Full Example: Async Route for Azure OpenAI Chat
This example uses the Azure OpenAI endpoint instead of the public OpenAI API — common in enterprise and healthcare AI deployments.
# routers/chat.py
import os
import asyncio
from fastapi import APIRouter, HTTPException, Depends
from openai import AsyncAzureOpenAI
from pydantic import BaseModel, Field
router = APIRouter(prefix="/chat", tags=["chat"])
# --- Client factory (instantiated once on startup via lifespan) ---
def get_azure_client() -> AsyncAzureOpenAI:
return AsyncAzureOpenAI(
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version="2024-12-01-preview",
)
# --- Models ---
class Message(BaseModel):
role: str = Field(..., pattern="^(system|user|assistant)$")
content: str = Field(..., min_length=1)
class ChatRequest(BaseModel):
messages: list[Message]
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
max_tokens: int = Field(default=1024, ge=1, le=4096)
deployment: str = Field(default="gpt-4o")
class ChatResponse(BaseModel):
content: str
finish_reason: str
prompt_tokens: int
completion_tokens: int
# --- Route ---
@router.post("/", response_model=ChatResponse)
async def chat_completion(
req: ChatRequest,
client: AsyncAzureOpenAI = Depends(get_azure_client),
) -> ChatResponse:
"""
Send a multi-turn conversation to Azure OpenAI and return the assistant reply.
"""
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=req.deployment,
messages=[m.model_dump() for m in req.messages],
temperature=req.temperature,
max_tokens=req.max_tokens,
),
timeout=60.0,
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="LLM request timed out after 60 seconds")
except Exception as exc:
raise HTTPException(status_code=502, detail=f"LLM error: {exc}") from exc
choice = response.choices[0]
usage = response.usage
return ChatResponse(
content=choice.message.content or "",
finish_reason=choice.finish_reason,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens,
)Mount the router in main.py:
from fastapi import FastAPI
from routers.chat import router as chat_router
app = FastAPI()
app.include_router(chat_router)Debugging Async Code
Enable asyncio debug mode
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
asyncio.get_event_loop().set_debug(True)This logs a warning whenever a coroutine blocks the event loop for more than 100 ms — invaluable for finding accidental blocking calls in production.
Measure coroutine latency
import time
import asyncio
async def timed(coro, label: str):
start = time.perf_counter()
result = await coro
elapsed = time.perf_counter() - start
print(f"{label} took {elapsed:.3f}s")
return result
# Usage
result = await timed(classify_text("hello world"), "classify")Key Takeaways
async defroutes allowawait— use them for all I/O-heavy handlers (LLM calls, DB queries, HTTP requests)- Plain
defroutes are automatically run in a thread pool by FastAPI — safe for synchronous blocking libraries - Never call
time.sleep()or synchronous blocking code inside anasync defhandler — useasyncio.to_thread()instead asyncio.gather()parallelises multiple awaitable calls, reducing total latency to approximately the slowest single call- Wrap long-running calls with
asyncio.wait_for()to enforce timeouts and avoid hanging requests - Azure OpenAI uses
AsyncAzureOpenAI— the same async patterns apply as with the standard OpenAI client
Next lesson: Pydantic v2 request and response models — the type-safe data layer that underpins all of FastAPI's validation and schema generation.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.