Python & FastAPI · Lesson 1 of 1
Python & FastAPI: Production APIs
Why FastAPI?
FastAPI is the fastest-growing Python web framework. It's used at Microsoft, Uber, Netflix, and hundreds of startups. What makes it special:
- Performance: on par with Node.js and Go — built on async Python (ASGI)
- Auto documentation: Swagger UI and ReDoc generated from your code
- Type safety: built-in validation via Pydantic — catch bugs before they ship
- Developer experience: auto-completion, inline validation errors, zero boilerplate
If you know Python, you can build a production API in under an hour with FastAPI.
Python Essentials for FastAPI
Before FastAPI, you need to understand these Python concepts.
Type hints
Python
# Python 3.10+
def greet(name: str, age: int) -> str:
return f"Hello {name}, you are {age}"
from typing import Optional, List, Dict, Union
def get_user(user_id: int, include_orders: bool = False) -> Optional[dict]:
...
# Modern union syntax (Python 3.10+)
def process(value: int | str | None) -> str:
...Dataclasses and Pydantic models
Python
from dataclasses import dataclass
from datetime import datetime
@dataclass
class User:
id: int
name: str
email: str
created_at: datetime = field(default_factory=datetime.utcnow)List/dict comprehensions
Python
# List comprehension
squares = [x**2 for x in range(10)]
evens = [n for n in numbers if n % 2 == 0]
# Dict comprehension
email_map = {user.id: user.email for user in users}
# Generator (lazy, memory efficient)
total = sum(item.price * item.qty for item in cart_items)async / await
Python
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # simulates I/O
return {"id": user_id, "name": "Alice"}
async def main():
# Run concurrently
users = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
asyncio.run(main())Decorators
Python
import functools
import time
def timer(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
print(f"{func.__name__} took {time.time() - start:.2f}s")
return result
return wrapper
@timer
def slow_function():
time.sleep(1)Project Setup
Bash
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
venv\Scripts\activate # Windows
# Install dependencies
pip install fastapi uvicorn[standard] sqlalchemy alembic pydantic-settings
pip install python-jose[cryptography] passlib[bcrypt] httpx
# Create project structure
mkdir -p app/{models,schemas,routers,services,core}
touch app/{__init__,main,database}.pyorderflow/
├── app/
│ ├── main.py # FastAPI app, startup
│ ├── database.py # SQLAlchemy engine
│ ├── models/ # SQLAlchemy ORM models
│ ├── schemas/ # Pydantic request/response models
│ ├── routers/ # Route handlers
│ ├── services/ # Business logic
│ └── core/
│ ├── config.py # Settings from environment
│ ├── security.py # JWT, hashing
│ └── deps.py # Dependency injection
├── alembic/ # Database migrations
├── tests/
└── requirements.txtFastAPI Application
Python
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routers import orders, auth, customers
from app.database import create_tables
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await create_tables()
yield
# Shutdown (cleanup)
app = FastAPI(
title="OrderFlow API",
description="Order management REST API",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://learnixo.io", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router, prefix="/api/v1/auth", tags=["auth"])
app.include_router(customers.router, prefix="/api/v1/customers", tags=["customers"])
app.include_router(orders.router, prefix="/api/v1/orders", tags=["orders"])
@app.get("/health")
async def health_check():
return {"status": "healthy"}Pydantic Models (Schemas)
Pydantic validates and serializes data automatically.
Python
# app/schemas/order.py
from pydantic import BaseModel, field_validator, model_validator
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import List
class OrderStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
class OrderItemCreate(BaseModel):
product_id: int
quantity: int
@field_validator("quantity")
@classmethod
def quantity_must_be_positive(cls, v: int) -> int:
if v <= 0:
raise ValueError("Quantity must be positive")
return v
class OrderCreate(BaseModel):
customer_id: int
items: List[OrderItemCreate]
notes: str | None = None
@model_validator(mode="after")
def items_not_empty(self) -> "OrderCreate":
if not self.items:
raise ValueError("Order must have at least one item")
return self
class OrderItemResponse(BaseModel):
id: int
product_id: int
product_name: str
quantity: int
unit_price: Decimal
model_config = {"from_attributes": True} # allows ORM model → schema
class OrderResponse(BaseModel):
id: int
customer_id: int
customer_name: str
status: OrderStatus
total: Decimal
items: List[OrderItemResponse]
created_at: datetime
model_config = {"from_attributes": True}
class PagedResponse(BaseModel):
items: List[OrderResponse]
page: int
page_size: int
total: int
total_pages: intSQLAlchemy Models
Python
# app/database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.core.config import settings
engine = create_async_engine(settings.database_url, echo=settings.debug)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db() -> AsyncSession: # dependency
async with AsyncSessionLocal() as session:
yield session
# app/models/order.py
from sqlalchemy import String, Integer, Numeric, ForeignKey, DateTime, Enum as SAEnum
from sqlalchemy.orm import relationship, Mapped, mapped_column
from datetime import datetime
from app.database import Base
class Customer(Base):
__tablename__ = "customers"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
tier: Mapped[str] = mapped_column(String(20), default="free")
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
is_active: Mapped[bool] = mapped_column(default=True)
orders: Mapped[list["Order"]] = relationship("Order", back_populates="customer")
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
customer_id: Mapped[int] = mapped_column(ForeignKey("customers.id"))
status: Mapped[str] = mapped_column(String(20), default="pending")
total: Mapped[float] = mapped_column(Numeric(10, 2), default=0)
notes: Mapped[str | None] = mapped_column(String(500), nullable=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
customer: Mapped["Customer"] = relationship("Customer", back_populates="orders")
items: Mapped[list["OrderItem"]] = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")Router with Dependency Injection
Python
# app/routers/orders.py
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.order import OrderCreate, OrderResponse, PagedResponse
from app.services.order_service import OrderService
from app.core.deps import get_current_user, require_role
from app.models.user import User
router = APIRouter()
@router.get("/", response_model=PagedResponse)
async def list_orders(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
status: str | None = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = OrderService(db)
return await service.get_paged(page, page_size, status, current_user.id)
@router.get("/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = OrderService(db)
order = await service.get_by_id(order_id, current_user.id)
if not order:
raise HTTPException(status_code=404, detail=f"Order {order_id} not found")
return order
@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
async def create_order(
payload: OrderCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = OrderService(db)
return await service.create(payload, current_user.id)
@router.delete("/{order_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_order(
order_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
service = OrderService(db)
deleted = await service.delete(order_id)
if not deleted:
raise HTTPException(status_code=404, detail="Order not found")JWT Authentication
Python
# app/core/security.py
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=60))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.jwt_secret, algorithm="HS256")
def decode_token(token: str) -> dict:
return jwt.decode(token, settings.jwt_secret, algorithms=["HS256"])
# app/core/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.core.security import decode_token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await get_user_by_id(db, user_id)
if user is None:
raise credentials_exception
return user
def require_role(role: str):
async def checker(user = Depends(get_current_user)):
if user.role != role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return checker
# app/routers/auth.py
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
user = await authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Incorrect email or password")
token = create_access_token({"sub": str(user.id), "role": user.role})
return {"access_token": token, "token_type": "bearer"}Settings and Configuration
Python
# app/core/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
# Database
database_url: str = "postgresql+asyncpg://user:pass@localhost/orderflow"
# JWT
jwt_secret: str = "change-me-in-production"
jwt_algorithm: str = "HS256"
jwt_expire_minutes: int = 60
# App
debug: bool = False
environment: str = "development"
# Email
smtp_host: str = "smtp.gmail.com"
smtp_port: int = 587
smtp_user: str = ""
smtp_password: str = ""
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
settings = Settings()Bash
# .env (never commit this)
DATABASE_URL=postgresql+asyncpg://postgres:password@localhost/orderflow
JWT_SECRET=super-secret-key-minimum-32-characters
ENVIRONMENT=production
DEBUG=falseGlobal Error Handling
Python
# app/main.py
from fastapi import Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={
"detail": "Validation error",
"errors": exc.errors(),
},
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
import logging
logging.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"},
)Background Tasks
Python
from fastapi import BackgroundTasks
@router.post("/orders", status_code=201)
async def create_order(
payload: OrderCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
order = await OrderService(db).create(payload, current_user.id)
# Fire and forget — doesn't delay the response
background_tasks.add_task(send_confirmation_email, order.id, current_user.email)
return order
async def send_confirmation_email(order_id: int, email: str):
# runs after the response is sent
await email_service.send(email, f"Order #{order_id} confirmed")Database Migrations with Alembic
Bash
# Initialize Alembic
alembic init alembic
# Create migration
alembic revision --autogenerate -m "create_orders_table"
# Run migrations
alembic upgrade head
# Rollback one step
alembic downgrade -1Python
# alembic/env.py
from app.database import Base
from app.models import * # import all models so Alembic finds them
target_metadata = Base.metadataTesting
Python
# tests/test_orders.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def client():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
@pytest.fixture
async def auth_headers(client):
response = await client.post("/api/v1/auth/login", data={
"username": "test@example.com",
"password": "testpassword",
})
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.mark.asyncio
async def test_create_order(client, auth_headers):
response = await client.post(
"/api/v1/orders",
json={"customer_id": 1, "items": [{"product_id": 1, "quantity": 2}]},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["status"] == "pending"
@pytest.mark.asyncio
async def test_get_order_not_found(client, auth_headers):
response = await client.get("/api/v1/orders/99999", headers=auth_headers)
assert response.status_code == 404Running in Production
DOCKERFILE
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]Bash
# Run locally
uvicorn app.main:app --reload --port 8000
# View auto-generated docs
open http://localhost:8000/docs # Swagger UI
open http://localhost:8000/redoc # ReDocWhat to Learn Next
- Python Interview Questions: 100 Q&As from beginner to senior
- Docker & Kubernetes: containerise and deploy this API
- AI Systems Engineering: add LLM capabilities to your FastAPI service