Learnixo
Back to blog
AI Systemsintermediate

Interview: FastAPI and Async Python Questions

12 interview Q&A pairs covering FastAPI and async Python for AI engineering roles. Topics include async vs sync, Pydantic validation, streaming, dependency injection, health checks, and a RAG system design question.

Asma Hafeez KhanMay 15, 202612 min read
fastapipythonasyncinterviewsystem-designrag
Share:š•

How to Use This Guide

These questions appear in senior Python/AI engineering interviews. Each answer is written at the depth expected at interview — detailed enough to show genuine understanding, concise enough to deliver in under two minutes. Study the code snippets: interviewers often ask you to write or critique code on a whiteboard or shared editor.


Q1. What is the difference between async def and def in a FastAPI route? When would you use each?

Answer:

FastAPI supports both synchronous and asynchronous route handlers.

async def marks a coroutine — it can use await to pause execution while waiting for I/O (database queries, HTTP calls, file reads) without blocking the thread. The event loop switches to other coroutines while this one waits. Use it whenever your handler performs I/O.

Plain def marks a synchronous function. FastAPI automatically runs it in a thread pool (via anyio.to_thread.run_sync) so that slow synchronous operations don't block the event loop. Use it when calling synchronous-only libraries (some ML libraries, legacy ORMs) or when doing pure CPU work.

The critical mistake to avoid: declaring async def but calling blocking synchronous code inside it.

Python
# WRONG — blocks the event loop for every other request
@app.get("/bad")
async def bad():
    time.sleep(5)       # blocks the entire event loop
    return {}

# CORRECT — blocking sync function, FastAPI thread-pools it
@app.get("/correct-sync")
def correct_sync():
    time.sleep(5)       # runs in thread pool — event loop stays free
    return {}

# CORRECT — async with blocking call offloaded
@app.get("/correct-async")
async def correct_async():
    await asyncio.to_thread(time.sleep, 5)
    return {}

In an AI service, almost all routes should be async def because they await LLM API calls, database queries, or Redis lookups.


Q2. How does FastAPI's dependency injection work? Give a real example.

Answer:

FastAPI's Depends(callable) tells the framework to call callable and pass its return value to the parameter. Dependencies are resolved before the handler runs, cached per request, and support cleanup via generator functions.

Python
from fastapi import FastAPI, Depends, HTTPException, Header
import jwt

app = FastAPI()
SECRET = "my-secret"

class User:
    def __init__(self, id: str, email: str):
        self.id = id
        self.email = email

# Dependency 1: extract and verify JWT
async def get_current_user(authorization: str | None = Header(default=None)) -> User:
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing token")
    try:
        payload = jwt.decode(authorization[7:], SECRET, algorithms=["HS256"])
        return User(id=payload["sub"], email=payload["email"])
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

# Dependency 2: require admin role (depends on Dependency 1)
def require_admin(user: User = Depends(get_current_user)) -> User:
    if "admin" not in user.roles:
        raise HTTPException(status_code=403, detail="Admin only")
    return user

# Route uses the chained dependency
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, admin: User = Depends(require_admin)):
    return {"deleted": user_id, "by": admin.email}

In tests, override any dependency cleanly:

Python
app.dependency_overrides[get_current_user] = lambda: User(id="test", email="t@t.com")

Q3. What is Pydantic v2 and how does it improve validation over v1?

Answer:

Pydantic v2 rewrote the validation engine in Rust (via pydantic-core), making validation 5–50x faster than v1. The API also became more explicit and consistent.

Key differences:

| Feature | v1 | v2 | |---------|----|----| | Validator decorator | @validator | @field_validator | | Root validator | @root_validator | @model_validator(mode="after") | | Config | inner Config class | ConfigDict | | Serialisation | .dict(), .json() | .model_dump(), .model_dump_json() | | Strict mode | Limited | First-class with model_config = ConfigDict(strict=True) |

Python
from pydantic import BaseModel, Field, field_validator, model_validator, ConfigDict

class ChatRequest(BaseModel):
    model_config = ConfigDict(extra="forbid")  # reject unknown fields

    messages: list[dict] = Field(..., min_length=1)
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)

    @field_validator("messages")
    @classmethod
    def non_empty_content(cls, v):
        for msg in v:
            if not msg.get("content", "").strip():
                raise ValueError("All messages must have non-empty content")
        return v

FastAPI uses Pydantic models to validate request bodies, serialise responses, and generate OpenAPI schemas — all from the same class definition.


Q4. How would you stream LLM tokens to a browser using FastAPI?

Answer:

Use StreamingResponse with an AsyncGenerator that yields SSE-formatted bytes:

Python
import json
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

async def token_stream(prompt: str) -> AsyncGenerator[bytes, None]:
    async with client.chat.completions.stream(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for chunk in stream:
            for choice in chunk.choices:
                if choice.delta.content:
                    data = json.dumps({"token": choice.delta.content})
                    yield f"data: {data}\n\n".encode()
    yield b"data: [DONE]\n\n"

@app.get("/stream")
async def stream(prompt: str) -> StreamingResponse:
    return StreamingResponse(
        token_stream(prompt),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

On the browser side, use fetch with a ReadableStream reader — not EventSource, because EventSource only supports GET and cannot send a JSON body.

Key points: once streaming starts, the HTTP 200 is committed — errors must be sent as JSON error events in the stream body. Set X-Accel-Buffering: no to prevent nginx buffering.


Q5. What is the lifespan pattern in FastAPI and why should you use it?

Answer:

The lifespan pattern uses @asynccontextmanager to run code at startup and shutdown:

Python
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    app.state.db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=20)
    app.state.ready = True

    yield   # Requests are handled here

    # Shutdown
    await app.state.db_pool.close()

app = FastAPI(lifespan=lifespan)

Use it instead of the deprecated @app.on_event("startup") decorators.

Why it matters for AI services: loading a local embedding model (e.g. SentenceTransformer) takes 5–30 seconds. Doing this in lifespan means the model is ready before the first request arrives. Resources stored in app.state persist for the process lifetime and are accessible in route handlers via request.app.state.


Q6. How do you write health check endpoints for a FastAPI AI service?

Answer:

Three probes serve different purposes:

  • /health (liveness) — always returns 200 if the process runs; failure causes restart
  • /health/ready (readiness) — checks all dependencies; failure stops traffic routing
  • /health/started (startup) — used during init; gives LLM services time to load models
Python
@app.get("/health")
async def liveness():
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness(request: Request):
    try:
        await asyncio.wait_for(request.app.state.db_pool.fetchval("SELECT 1"), timeout=2.0)
        db_ok = True
    except Exception:
        db_ok = False

    all_ok = db_ok and request.app.state.embedding_model is not None

    return JSONResponse(
        status_code=200 if all_ok else 503,
        content={"status": "ready" if all_ok else "not_ready",
                 "checks": {"database": db_ok, "model": request.app.state.embedding_model is not None}},
    )

Critical rule: always wrap external dependency checks with asyncio.wait_for() and a short timeout. A hanging health check is worse than a failing one.


Q7. What are BackgroundTasks and when should you use Celery instead?

Answer:

BackgroundTasks runs a function after the HTTP response is sent, without the client waiting:

Python
@app.post("/chat")
async def chat(req: ChatRequest, background_tasks: BackgroundTasks):
    response = await llm_client.complete(req.messages)
    background_tasks.add_task(log_llm_call, req, response)  # runs after response
    return response

Use BackgroundTasks for: audit logging, metrics, cache invalidation — cheap, fire-and-forget, non-critical.

Limitations: not persistent (task lost on crash), no retries, no scheduling, no monitoring.

Use Celery (or Azure Service Bus) when:

  • The task must survive a process crash (persistent job queue)
  • You need automatic retries with exponential back-off
  • You need to schedule tasks at a specific time
  • You need a dead-letter queue for failed tasks
  • Multiple services need to consume the same work items

Q8. How do you handle path parameters, query parameters, and request bodies in FastAPI?

Answer:

FastAPI determines the source by convention:

  • Name appears in the path string (/drugs/{drug_id}) → path parameter
  • Plain Python type, no BaseModel → query parameter
  • Pydantic BaseModel type → request body (JSON)
  • Default is Header(), Query(), etc. → that source
Python
from fastapi import Path, Query, Body
from pydantic import BaseModel, Field

class DrugUpdate(BaseModel):
    name: str | None = None
    active: bool | None = None

@app.patch("/drugs/{drug_id}")
async def update_drug(
    drug_id: int = Path(..., ge=1),          # from URL /drugs/42
    include_inactive: bool = Query(False),    # from ?include_inactive=true
    updates: DrugUpdate = Body(...),          # from JSON body
):
    ...

Use Field(ge=1, le=100) for numeric constraints, min_length / max_length for strings, and pattern=r"..." for regex validation.


Q9. How would you test a FastAPI route that calls OpenAI?

Answer:

Use app.dependency_overrides to inject a mock client without hitting the real API:

Python
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, MagicMock
from main import app
from dependencies import get_openai_client

def test_chat_endpoint():
    mock_response = MagicMock()
    mock_response.choices[0].message.content = "Test answer"
    mock_response.choices[0].finish_reason = "stop"
    mock_response.usage.prompt_tokens = 10
    mock_response.usage.completion_tokens = 20
    mock_response.model = "gpt-4o"

    mock_client = MagicMock()
    mock_client.chat.completions.create = AsyncMock(return_value=mock_response)

    app.dependency_overrides[get_openai_client] = lambda: mock_client

    with TestClient(app) as client:
        resp = client.post("/chat", json={
            "messages": [{"role": "user", "content": "Hello"}]
        })

    app.dependency_overrides.clear()

    assert resp.status_code == 200
    assert "Test answer" in resp.json()["content"]

Key points: TestClient used as a context manager runs the full lifespan. AsyncMock is needed for async def methods. Clear dependency_overrides after the test to avoid leaking state.


Q10. What is asyncio.gather() and when does it help in AI services?

Answer:

asyncio.gather() schedules multiple coroutines concurrently on the same event loop. They all start immediately; you await all of them together. Total time is approximately the slowest single call rather than the sum.

Python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def classify(text: str) -> str:
    r = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": f"Sentiment (positive/negative/neutral): {text}"}],
        max_tokens=5,
    )
    return r.choices[0].message.content.strip()

@app.post("/classify-batch")
async def classify_batch(texts: list[str]) -> list[str]:
    # Serial: total_time = sum(latencies) — e.g. 10 x 500ms = 5 seconds
    # Parallel: total_time ā‰ˆ max(latency) — e.g. ~500ms for all 10
    results = await asyncio.gather(*[classify(t) for t in texts])
    return list(results)

In AI services, gather is useful when you need to:

  • Classify or embed a batch of inputs in parallel
  • Fetch context from multiple sources (DB + Redis + vector store) simultaneously
  • Call multiple LLMs and use the first result (combine with asyncio.wait and FIRST_COMPLETED)

Handle errors with return_exceptions=True to collect failures without cancelling the whole gather.


Q11. Why run FastAPI with a single uvicorn worker per container instead of multiple workers?

Answer:

In container orchestration (Kubernetes, Azure Container Apps), horizontal scaling is done by running multiple container replicas, not by running multiple worker processes per container. This has several advantages:

  • Resource isolation — each replica gets exactly the CPU/memory it is allocated
  • Independent health — a crashed replica is restarted individually without affecting others
  • Independent scaling — the orchestrator can add/remove replicas based on HTTP traffic or CPU
  • Simpler lifespan — each process has one event loop; shared state (DB pool, loaded model) is not duplicated across workers

With multiple uvicorn workers per container (--workers 4):

  • Each worker is a separate process with its own memory space
  • Resources initialised in the lifespan (like a 2 GB embedding model) are duplicated four times
  • app.state is not shared between workers — a model loaded by worker 1 is invisible to worker 2

The exception: if you run on a large bare-metal VM without an orchestrator, multiple workers make sense. But in ACA or Kubernetes, keep it at one worker and let the platform scale replicas.


Q12. System Design: Design a FastAPI Service for RAG

Question: Design a production FastAPI service that answers natural-language questions about a drug formulary using Retrieval-Augmented Generation (RAG). Walk through the architecture, key endpoints, data flow, and how you would handle latency and failures.

Answer:

Architecture

Client
  │  POST /rag/query  {"question": "...", "user_id": "..."}
  ā–¼
FastAPI Service (Azure Container Apps)
  │
  ā”œā”€ā”€ Pydantic validation (RAGRequest model)
  │
  ā”œā”€ā”€ [Parallel with asyncio.gather]
  │     ā”œā”€ā”€ Embed question → text-embedding-3-small (Azure OpenAI)
  │     └── Fetch user context → PostgreSQL (user preferences, history)
  │
  ā”œā”€ā”€ Vector search → Azure AI Search or pgvector
  │   (top 5 drug chunks most similar to question embedding)
  │
  ā”œā”€ā”€ Build prompt: system + retrieved context + question
  │
  ā”œā”€ā”€ LLM call → Azure OpenAI gpt-4o
  │   (with streaming if client requests it)
  │
  ā”œā”€ā”€ BackgroundTask: log to audit_llm_calls table
  │
  └── Return RAGResponse {"answer": "...", "sources": [...]}

Key Endpoints

Python
POST /rag/query        # Standard: return full answer
POST /rag/query/stream # Streaming: SSE token stream
GET  /health           # Liveness
GET  /health/ready     # Readiness: checks DB, Redis, embedding model, Azure OpenAI
GET  /health/started   # Startup: waits for embedding model to load

Latency Optimisations

  1. Parallel embed + context fetch — use asyncio.gather so embedding and DB lookup run simultaneously
  2. Embedding model warm-up — load in lifespan, not on first request
  3. Result caching — Redis cache keyed by hash(question), TTL 15 minutes for common queries
  4. Streaming — return tokens as they arrive; perceived latency is near-zero

Failure Handling

| Failure | Response | |---------|----------| | Azure OpenAI timeout | Return 504 or stream error event | | Vector store unavailable | Return 503 with diagnostic from /health/ready | | No relevant chunks found | Return answer with note: "No matching documents found" | | Embedding service down | Fall back to keyword search in PostgreSQL |

Models (simplified)

Python
class RAGRequest(BaseModel):
    question: str = Field(..., min_length=5, max_length=1000)
    user_id: str
    top_k: int = Field(default=5, ge=1, le=20)
    stream: bool = False

class Source(BaseModel):
    drug_name: str
    section: str
    score: float

class RAGResponse(BaseModel):
    answer: str
    sources: list[Source]
    cached: bool
    latency_ms: float

Observability

  • Structured JSON logging with user_id, question_hash, retrieved_chunks, latency_ms, tokens_used
  • Azure Application Insights for distributed tracing
  • BackgroundTask writes every call to rag_audit_log for compliance
  • /health/ready checks all four dependencies: DB, Redis, embedding model, Azure OpenAI

This design is horizontally scalable (stateless API, external state in DB/Redis/vector store), observable, and recovers gracefully from dependency failures.


Summary of Topics Covered

| Question | Topic | |---------|-------| | Q1 | async def vs def — when to use each | | Q2 | Dependency injection with Depends() | | Q3 | Pydantic v2 validation and v1 migration | | Q4 | LLM token streaming with StreamingResponse | | Q5 | Application lifespan for startup/shutdown | | Q6 | Liveness, readiness, and startup health probes | | Q7 | BackgroundTasks vs Celery | | Q8 | Path, query, and body parameter handling | | Q9 | Testing with dependency_overrides | | Q10 | asyncio.gather for parallel LLM calls | | Q11 | Single worker per container in orchestration | | Q12 | System design — RAG service architecture |

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.