Learnixo

FastAPI for AI Engineers · Lesson 2 of 12

async/await in FastAPI: Why It Matters

The Event Loop Model

Python's asyncio library implements a cooperative multitasking model. A single thread runs an event loop — a scheduler that decides which coroutine gets CPU time next.

A coroutine is a function declared with async def. It can pause itself at an await expression, yielding control back to the event loop. The event loop then runs another coroutine until it too hits an await, and so on.

Python
import asyncio

async def fetch_data():
    print("start fetch")
    await asyncio.sleep(1)   # pauses here  another coroutine can run
    print("data ready")
    return {"items": [1, 2, 3]}

async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

The key insight: await asyncio.sleep(1) does not block the thread for one second. It tells the event loop "I am done for now, wake me up in one second". During that second the event loop can handle hundreds of other coroutines.

Contrast that with time.sleep(1) — a blocking call that freezes the entire thread and every other request in the same process.

async def vs def in FastAPI Routes

FastAPI supports both synchronous and asynchronous route handlers.

When to use async def

Use async def when your handler needs to await something:

Python
from fastapi import FastAPI
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.post("/chat")
async def chat(prompt: str) -> dict:
    # This suspends the handler while waiting for the API response
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
    )
    return {"reply": response.choices[0].message.content}

Awaitable calls include:

  • async database drivers (asyncpg, motor, aiosqlite)
  • httpx.AsyncClient HTTP calls
  • OpenAI SDK async client
  • Redis async client (redis.asyncio)
  • Any function declared with async def

When to use def

Use plain def when your handler only does CPU work or calls synchronous libraries you cannot control:

Python
@app.get("/compute")
def heavy_math(n: int) -> dict:
    # Pure CPU work  no I/O waiting
    result = sum(i * i for i in range(n))
    return {"result": result}

FastAPI runs synchronous handlers in a thread pool using anyio.to_thread.run_sync() automatically. This means a slow synchronous handler won't block the event loop — FastAPI handles the thread-pooling for you.

The mistake to avoid: declaring async def but then calling blocking code inside it.

Python
# BAD  blocks the event loop
@app.get("/bad")
async def bad_route():
    import time
    time.sleep(5)          # Freezes every other request for 5 seconds!
    return {"done": True}

# GOOD  use synchronous def and let FastAPI thread-pool it
@app.get("/good-sync")
def good_sync_route():
    import time
    time.sleep(5)          # Runs in thread pool, event loop stays free
    return {"done": True}

# ALSO GOOD  use asyncio.to_thread inside async def
@app.get("/good-async")
async def good_async_route():
    import asyncio, time
    await asyncio.to_thread(time.sleep, 5)
    return {"done": True}

Common Async Patterns

Pattern 1: Awaiting a Database Query

Python
import asyncpg
from fastapi import FastAPI, Depends

app = FastAPI()

async def get_db():
    conn = await asyncpg.connect(dsn="postgresql://user:pass@localhost/mydb")
    try:
        yield conn
    finally:
        await conn.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(get_db)) -> dict:
    row = await db.fetchrow("SELECT id, name, email FROM users WHERE id = $1", user_id)
    if row is None:
        from fastapi import HTTPException
        raise HTTPException(status_code=404, detail="User not found")
    return dict(row)

Pattern 2: Awaiting an OpenAI Chat Completion

Python
from openai import AsyncOpenAI
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()
client = AsyncOpenAI()   # Uses OPENAI_API_KEY from environment

class ChatRequest(BaseModel):
    system_prompt: str = "You are a helpful assistant."
    user_message: str

class ChatResponse(BaseModel):
    content: str
    model: str
    prompt_tokens: int
    completion_tokens: int

@app.post("/chat", response_model=ChatResponse)
async def chat(req: ChatRequest) -> ChatResponse:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": req.system_prompt},
            {"role": "user", "content": req.user_message},
        ],
        temperature=0.7,
    )
    choice = response.choices[0]
    usage = response.usage
    return ChatResponse(
        content=choice.message.content,
        model=response.model,
        prompt_tokens=usage.prompt_tokens,
        completion_tokens=usage.completion_tokens,
    )

Pattern 3: Awaiting an Embedding Call

Python
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def embed_text(text: str) -> list[float]:
    """Return a 1536-dimension embedding vector for the given text."""
    response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )
    return response.data[0].embedding

asyncio.gather: Running Async Calls in Parallel

asyncio.gather() schedules multiple coroutines concurrently on the same event loop. Instead of awaiting them one at a time (serial), they all start immediately and you wait for all of them to finish.

Python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def classify_text(text: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Classify the sentiment: positive, negative, or neutral."},
            {"role": "user", "content": text},
        ],
        max_tokens=10,
    )
    return response.choices[0].message.content.strip()

@app.post("/classify-batch")
async def classify_batch(texts: list[str]) -> list[str]:
    """Classify multiple texts in parallel — much faster than serial."""
    # Serial approach: total time = sum of each call's latency
    # Parallel approach: total time ≈ max single call's latency
    results = await asyncio.gather(
        *[classify_text(t) for t in texts]
    )
    return list(results)

gather with error handling

By default, if one coroutine in gather raises an exception, the whole gather raises immediately. Use return_exceptions=True to collect errors alongside results:

Python
async def safe_gather_example(texts: list[str]) -> list[str | Exception]:
    results = await asyncio.gather(
        *[classify_text(t) for t in texts],
        return_exceptions=True,
    )
    output = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            output.append(f"ERROR for text {i}: {result}")
        else:
            output.append(result)
    return output

gather with timeouts

Wrap gather in asyncio.wait_for to enforce an overall deadline:

Python
try:
    results = await asyncio.wait_for(
        asyncio.gather(*[classify_text(t) for t in texts]),
        timeout=15.0,
    )
except asyncio.TimeoutError:
    raise HTTPException(status_code=504, detail="Classification timed out")

Blocking Code in Async Context: asyncio.to_thread()

Sometimes you must call a synchronous library that you cannot swap out — a legacy database driver, a PDF parser, a local ML model that doesn't have an async API.

asyncio.to_thread() runs the blocking call in a thread pool, allowing the event loop to keep processing other requests while it waits.

Python
import asyncio
import pdfplumber   # synchronous PDF library

async def extract_pdf_text(path: str) -> str:
    def _extract():
        with pdfplumber.open(path) as pdf:
            return "\n".join(page.extract_text() or "" for page in pdf.pages)

    # Run the blocking _extract() in a thread so the event loop stays free
    text = await asyncio.to_thread(_extract)
    return text

@app.post("/analyse-pdf")
async def analyse_pdf(pdf_path: str) -> dict:
    text = await extract_pdf_text(pdf_path)
    summary = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Summarise the document in 3 bullet points."},
            {"role": "user", "content": text[:8000]},
        ],
    )
    return {"summary": summary.choices[0].message.content}

Full Example: Async Route for Azure OpenAI Chat

This example uses the Azure OpenAI endpoint instead of the public OpenAI API — common in enterprise and healthcare AI deployments.

Python
# routers/chat.py
import os
import asyncio
from fastapi import APIRouter, HTTPException, Depends
from openai import AsyncAzureOpenAI
from pydantic import BaseModel, Field

router = APIRouter(prefix="/chat", tags=["chat"])


# --- Client factory (instantiated once on startup via lifespan) ---

def get_azure_client() -> AsyncAzureOpenAI:
    return AsyncAzureOpenAI(
        api_key=os.environ["AZURE_OPENAI_API_KEY"],
        azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
        api_version="2024-12-01-preview",
    )


# --- Models ---

class Message(BaseModel):
    role: str = Field(..., pattern="^(system|user|assistant)$")
    content: str = Field(..., min_length=1)

class ChatRequest(BaseModel):
    messages: list[Message]
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    max_tokens: int = Field(default=1024, ge=1, le=4096)
    deployment: str = Field(default="gpt-4o")

class ChatResponse(BaseModel):
    content: str
    finish_reason: str
    prompt_tokens: int
    completion_tokens: int


# --- Route ---

@router.post("/", response_model=ChatResponse)
async def chat_completion(
    req: ChatRequest,
    client: AsyncAzureOpenAI = Depends(get_azure_client),
) -> ChatResponse:
    """
    Send a multi-turn conversation to Azure OpenAI and return the assistant reply.
    """
    try:
        response = await asyncio.wait_for(
            client.chat.completions.create(
                model=req.deployment,
                messages=[m.model_dump() for m in req.messages],
                temperature=req.temperature,
                max_tokens=req.max_tokens,
            ),
            timeout=60.0,
        )
    except asyncio.TimeoutError:
        raise HTTPException(status_code=504, detail="LLM request timed out after 60 seconds")
    except Exception as exc:
        raise HTTPException(status_code=502, detail=f"LLM error: {exc}") from exc

    choice = response.choices[0]
    usage = response.usage

    return ChatResponse(
        content=choice.message.content or "",
        finish_reason=choice.finish_reason,
        prompt_tokens=usage.prompt_tokens,
        completion_tokens=usage.completion_tokens,
    )

Mount the router in main.py:

Python
from fastapi import FastAPI
from routers.chat import router as chat_router

app = FastAPI()
app.include_router(chat_router)

Debugging Async Code

Enable asyncio debug mode

Python
import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)
asyncio.get_event_loop().set_debug(True)

This logs a warning whenever a coroutine blocks the event loop for more than 100 ms — invaluable for finding accidental blocking calls in production.

Measure coroutine latency

Python
import time
import asyncio

async def timed(coro, label: str):
    start = time.perf_counter()
    result = await coro
    elapsed = time.perf_counter() - start
    print(f"{label} took {elapsed:.3f}s")
    return result

# Usage
result = await timed(classify_text("hello world"), "classify")

Key Takeaways

  • async def routes allow await — use them for all I/O-heavy handlers (LLM calls, DB queries, HTTP requests)
  • Plain def routes are automatically run in a thread pool by FastAPI — safe for synchronous blocking libraries
  • Never call time.sleep() or synchronous blocking code inside an async def handler — use asyncio.to_thread() instead
  • asyncio.gather() parallelises multiple awaitable calls, reducing total latency to approximately the slowest single call
  • Wrap long-running calls with asyncio.wait_for() to enforce timeouts and avoid hanging requests
  • Azure OpenAI uses AsyncAzureOpenAI — the same async patterns apply as with the standard OpenAI client

Next lesson: Pydantic v2 request and response models — the type-safe data layer that underpins all of FastAPI's validation and schema generation.