.NET & C# Development · Lesson 187 of 229
Python & FastAPI Interview Questions — Junior to Senior
Python & FastAPI Interview Questions — Junior to Senior
Junior Level
Q1: What is FastAPI and what makes it different from Flask?
FastAPI is an async-first Python web framework built on Starlette and Pydantic. Key differences from Flask:
- Async by default — route handlers can be
async def, Flask is WSGI (sync) - Automatic validation — request bodies, query params, and path params validated by Pydantic automatically
- Auto-generated docs — OpenAPI (Swagger) and ReDoc docs generated from type hints, no extra config
- Type safety — function signatures are the schema; no separate schema definition needed
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
in_stock: bool = True
@app.post("/items", response_model=Item, status_code=201)
async def create_item(item: Item):
return item # FastAPI validates input and serialises output automaticallyQ2: What is Pydantic and why does FastAPI use it?
Pydantic is a data validation library using Python type hints. FastAPI uses it to:
- Parse and validate request bodies, query params, and path params
- Serialise response models to JSON
- Generate the OpenAPI schema from model definitions
from pydantic import BaseModel, Field, EmailStr, field_validator
class UserCreate(BaseModel):
name: str = Field(min_length=1, max_length=100)
email: EmailStr
age: int = Field(ge=18, le=120)
@field_validator("name")
@classmethod
def name_must_not_contain_numbers(cls, v: str) -> str:
if any(c.isdigit() for c in v):
raise ValueError("Name must not contain digits")
return v.strip()Pydantic V2 (default in FastAPI 0.100+) uses Rust for validation — 5–50x faster than V1.
Q3: What is the difference between async def and def in FastAPI route handlers?
async def— FastAPI runs it on the event loop. Use for I/O-bound work (database, HTTP calls, Redis). Does not block the event loop.def— FastAPI runs it in a thread pool executor to avoid blocking the event loop. Use for CPU-bound or synchronous I/O work.
# Correct: async for async I/O
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
return await db.get(User, id)
# Correct: sync def for blocking/CPU work
@app.get("/compute")
def heavy_compute(n: int):
return {"result": sum(range(n))} # FastAPI runs this in thread pool
# Wrong: blocking I/O in async def — blocks the event loop
@app.get("/bad")
async def bad_handler():
time.sleep(5) # blocks event loop — kills performanceQ4: How do you define path parameters, query parameters, and request bodies?
from fastapi import FastAPI, Query, Path
from pydantic import BaseModel
app = FastAPI()
class OrderCreate(BaseModel):
product_id: int
quantity: int
@app.get("/orders/{order_id}")
async def get_order(
order_id: int = Path(gt=0), # path parameter, must be > 0
include_lines: bool = Query(default=False), # query param: ?include_lines=true
status: str | None = Query(default=None), # optional query param
):
return {"order_id": order_id, "include_lines": include_lines}
@app.post("/orders")
async def create_order(order: OrderCreate): # request body from JSON
return orderQ5: How does dependency injection work in FastAPI?
FastAPI's Depends system injects dependencies declared in function signatures. Dependencies can be async, can have their own dependencies (nested), and can yield for setup/teardown.
from fastapi import Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncSession:
async with async_session_factory() as session:
yield session # FastAPI calls code after yield on request completion
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
user = await verify_token(token, db)
if not user:
raise HTTPException(status_code=401)
return user
@app.get("/me")
async def read_me(current_user: User = Depends(get_current_user)):
return current_userDependencies are cached per request by default — get_db is only called once even if multiple route functions depend on it.
Q6: How do you return different HTTP status codes?
from fastapi import FastAPI, HTTPException, status
from fastapi.responses import JSONResponse
app = FastAPI()
@app.post("/items", status_code=status.HTTP_201_CREATED)
async def create_item(item: ItemCreate) -> ItemResponse:
return saved_item
@app.delete("/items/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_item(id: int):
await item_service.delete(id)
# return nothing for 204
@app.get("/items/{id}")
async def get_item(id: int) -> ItemResponse:
item = await item_service.get(id)
if not item:
raise HTTPException(status_code=404, detail=f"Item {id} not found")
return itemQ7: What is a response_model and when do you use it?
response_model tells FastAPI what to serialise the return value as. It strips any fields not in the model (useful for hiding password hashes), validates the output, and drives the OpenAPI docs.
class UserInDB(BaseModel):
id: int
email: str
hashed_password: str # internal field
class UserPublic(BaseModel):
id: int
email: str
# no hashed_password
@app.get("/users/{id}", response_model=UserPublic)
async def get_user(id: int) -> UserInDB:
user = await db.get_user(id)
return user # FastAPI strips hashed_password before sending responseMid Level
Q8: How do you handle global exception handling in FastAPI?
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.exception_handlers import http_exception_handler
app = FastAPI()
class DomainException(Exception):
def __init__(self, message: str, code: str):
self.message = message
self.code = code
@app.exception_handler(DomainException)
async def domain_exception_handler(request: Request, exc: DomainException):
return JSONResponse(
status_code=422,
content={"error": exc.code, "message": exc.message},
)
# Override FastAPI's default 422 validation error response
from fastapi.exceptions import RequestValidationError
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={"error": "validation_failed", "detail": exc.errors()},
)Q9: How do you write middleware in FastAPI?
import time
from fastapi import FastAPI, Request
import uuid
app = FastAPI()
@app.middleware("http")
async def correlation_id_middleware(request: Request, call_next):
correlation_id = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
response.headers["X-Correlation-ID"] = correlation_id
response.headers["X-Response-Time"] = f"{duration_ms:.1f}ms"
return response
# Or use Starlette BaseHTTPMiddleware for class-based middleware
from starlette.middleware.base import BaseHTTPMiddleware
class RateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Check rate limit in Redis
if not await check_rate_limit(request.client.host):
return JSONResponse({"error": "rate_limited"}, status_code=429)
return await call_next(request)
app.add_middleware(RateLimitMiddleware)Q10: What are background tasks in FastAPI and when should you use them?
BackgroundTasks runs code after the response is sent — good for fire-and-forget work (sending an email, logging an event) that the client doesn't need to wait for.
from fastapi import BackgroundTasks
def send_welcome_email(email: str, name: str):
# Runs after response is sent
email_service.send(to=email, subject="Welcome!", body=f"Hi {name}!")
@app.post("/users", status_code=201)
async def create_user(user: UserCreate, background_tasks: BackgroundTasks):
created = await user_service.create(user)
background_tasks.add_task(send_welcome_email, created.email, created.name)
return created # response sent immediately; email queued in backgroundFor anything that must survive a server restart, use a proper task queue (Celery, ARQ, or FastAPI with MassTransit equivalent) — BackgroundTasks is in-process only.
Q11: How do you test a FastAPI application?
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from app.main import app
from app.database import get_db, Base
@pytest.fixture
async def db_session():
engine = create_async_engine("postgresql+asyncpg://test:test@localhost/test_db")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSession(engine) as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def client(db_session):
app.dependency_overrides[get_db] = lambda: db_session
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
yield client
app.dependency_overrides.clear()
async def test_create_user(client: AsyncClient):
response = await client.post("/users", json={"name": "Alice", "email": "alice@test.com"})
assert response.status_code == 201
data = response.json()
assert data["email"] == "alice@test.com"
assert "id" in dataUse pytest-asyncio with asyncio_mode = "auto" in pytest.ini for async test support.
Q12: How do you structure a FastAPI application for production?
app/
├── main.py # FastAPI app factory, lifespan, middleware, router includes
├── api/
│ ├── v1/
│ │ ├── users.py # APIRouter for /users
│ │ ├── orders.py # APIRouter for /orders
│ │ └── __init__.py
│ └── deps.py # Shared dependencies (get_db, get_current_user)
├── core/
│ ├── config.py # Settings via pydantic-settings
│ ├── security.py # JWT logic
│ └── logging.py # structlog setup
├── domain/
│ ├── users/
│ │ ├── models.py # SQLAlchemy models
│ │ ├── schemas.py # Pydantic request/response models
│ │ ├── service.py # Business logic
│ │ └── repository.py
│ └── orders/
├── db/
│ ├── session.py # AsyncSession factory
│ └── base.py # Base model
└── tests/# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await db_engine.connect()
await redis_client.ping()
yield
# Shutdown
await db_engine.dispose()
await redis_client.aclose()
def create_app() -> FastAPI:
app = FastAPI(lifespan=lifespan, title="API", version="1.0.0")
app.include_router(users_router, prefix="/api/v1/users", tags=["users"])
app.include_router(orders_router, prefix="/api/v1/orders", tags=["orders"])
return app
app = create_app()Senior Level
Q13: How do you implement proper async database access in FastAPI?
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800, # recycle connections after 30 min
echo=False,
)
async_session_factory = async_sessionmaker(
engine,
expire_on_commit=False, # avoid lazy loads after commit
class_=AsyncSession,
)
async def get_db() -> AsyncSession:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseAlways use asyncpg (not psycopg2) for PostgreSQL async. asyncpg is 3–10x faster because it uses the PostgreSQL binary protocol directly.
Q14: How do you implement idempotent API endpoints?
from fastapi import Header, HTTPException
import hashlib
@app.post("/payments", status_code=201)
async def create_payment(
payment: PaymentCreate,
idempotency_key: str = Header(alias="Idempotency-Key"),
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
):
cache_key = f"idempotency:{idempotency_key}"
# Check if we've already processed this key
cached = await redis.get(cache_key)
if cached:
return JSONResponse(content=json.loads(cached), status_code=200)
# Process payment
result = await payment_service.charge(payment)
# Cache result with 24h TTL
await redis.setex(cache_key, 86400, result.model_dump_json())
return resultQ15: How do you implement rate limiting in FastAPI?
from fastapi import Request, HTTPException
import time
class SlidingWindowRateLimiter:
def __init__(self, redis: Redis, limit: int, window_seconds: int):
self._redis = redis
self._limit = limit
self._window = window_seconds
async def check(self, key: str) -> None:
now = time.time()
window_start = now - self._window
pipeline = self._redis.pipeline()
# Remove entries outside the window
pipeline.zremrangebyscore(key, 0, window_start)
# Count current window
pipeline.zcard(key)
# Add current request
pipeline.zadd(key, {str(now): now})
# Set TTL
pipeline.expire(key, self._window)
_, count, *_ = await pipeline.execute()
if count >= self._limit:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
rate_limiter = SlidingWindowRateLimiter(redis, limit=100, window_seconds=60)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
await rate_limiter.check(f"rate:{client_ip}")
return await call_next(request)Q16: What Python async gotchas should you know for FastAPI?
Blocking I/O in async context — the silent killer:
# Wrong — blocks the event loop for every request
@app.get("/bad")
async def bad():
import requests # sync HTTP library
response = requests.get("https://api.example.com/data") # blocks
return response.json()
# Correct — use httpx with async
import httpx
@app.get("/good")
async def good():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()Shared mutable state between requests:
# Wrong — all requests share this list
cache = []
@app.get("/items")
async def get_items():
cache.append("something") # race condition
return cache
# Correct — use Redis or asyncio.Lock for shared state
lock = asyncio.Lock()
@app.get("/items")
async def get_items():
async with lock:
cache.append("something")Not awaiting coroutines:
# Wrong — this creates a coroutine but never runs it
result = send_email(user.email) # no await — silent bug
# Correct
result = await send_email(user.email)Q17: How do you handle environment-specific configuration?
from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
database_url: str
redis_url: str = "redis://localhost:6379"
secret_key: str
debug: bool = False
allowed_origins: list[str] = ["http://localhost:3000"]
log_level: str = "INFO"
@lru_cache
def get_settings() -> Settings:
return Settings()
# In routes
@app.get("/health")
async def health(settings: Settings = Depends(get_settings)):
return {"env": "debug" if settings.debug else "production"}The @lru_cache ensures Settings() is only instantiated once — reading env vars and .env file once at startup. Use get_settings.cache_clear() in tests to inject different configs.