FastAPI for AI Engineers · Lesson 8 of 12
Lifespan Events: Startup and Shutdown
Why Lifespan Matters
Some resources are expensive to create and should be initialised once when the server starts, not on every request:
- Database connection pools — opening connections takes 10–100 ms each; a pool keeps connections ready
- Redis connections — similar to DB pools
- Embedding models loaded in-process — loading a sentence-transformer or a local LLM can take 5–30 seconds
- HTTP client sessions — re-using a session gives you connection pooling and keep-alive
- Configuration loaded from Azure Key Vault — a remote fetch you only want to do once
Equally important: when the server shuts down gracefully, you want to close those connections cleanly rather than leaving open sockets or unflushed buffers.
The Old Way: on_event (Deprecated)
FastAPI 0.x used event decorators:
# OLD — do not use
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.db_pool = await create_pool()
@app.on_event("shutdown")
async def shutdown():
await app.state.db_pool.close()These decorators still work but are deprecated. The lifespan pattern replaces them.
The Lifespan Pattern (Current Standard)
Use @asynccontextmanager from the standard library to define a single async context manager that covers both startup and shutdown:
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- STARTUP ---
print("Starting up...")
app.state.db_pool = await create_db_pool()
app.state.redis = await create_redis()
yield # Application runs here — requests are handled
# --- SHUTDOWN ---
print("Shutting down...")
await app.state.db_pool.close()
await app.state.redis.aclose()
app = FastAPI(lifespan=lifespan)Everything before yield is startup; everything after yield is shutdown. The yield is where the application lives — requests are served during that window.
Initialising a Database Pool
import asyncpg
from contextlib import asynccontextmanager
from fastapi import FastAPI
DATABASE_URL = "postgresql://user:pass@localhost/mydb"
@asynccontextmanager
async def lifespan(app: FastAPI):
# Create pool on startup
app.state.db_pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=2,
max_size=20,
command_timeout=30,
)
print(f"DB pool created: min=2, max=20")
yield
# Close pool on shutdown
await app.state.db_pool.close()
print("DB pool closed")
app = FastAPI(lifespan=lifespan)Access the pool in a dependency:
from fastapi import Request, Depends
import asyncpg
async def get_db(request: Request):
async with request.app.state.db_pool.acquire() as conn:
yield connInitialising Redis
import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from fastapi import FastAPI
REDIS_URL = "redis://localhost:6379"
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = aioredis.from_url(
REDIS_URL,
encoding="utf-8",
decode_responses=True,
max_connections=50,
)
await app.state.redis.ping()
print("Redis connected")
yield
await app.state.redis.aclose()
print("Redis disconnected")
app = FastAPI(lifespan=lifespan)Why Lifespan Matters for LLM Services (Model Loading Is Slow)
Loading a local embedding model takes several seconds — sometimes over 30 seconds for large models. If you load the model on the first request, that user experiences a multi-second delay. If you load it in the lifespan, the model is warm before any request arrives.
from sentence_transformers import SentenceTransformer
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
@asynccontextmanager
async def lifespan(app: FastAPI):
print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}")
# SentenceTransformer is a synchronous library — load in thread pool
model = await asyncio.to_thread(SentenceTransformer, EMBEDDING_MODEL_NAME)
app.state.embedding_model = model
print("Embedding model loaded and ready")
yield
# No explicit cleanup needed for in-memory models
del app.state.embedding_model
print("Embedding model unloaded")
app = FastAPI(lifespan=lifespan)Use asyncio.to_thread for model loading because SentenceTransformer() is synchronous and blocking. Running it directly in the async lifespan would block the event loop.
State Sharing via app.state
app.state is a plain namespace — you can assign any attribute to it:
app.state.db_pool = pool # asyncpg pool
app.state.redis = redis_client # redis client
app.state.embedding_model = model
app.state.settings = settings # config object
app.state.feature_flags = flagsIn route handlers, access state via the Request object or via a dependency:
# Via Request
from fastapi import Request
@app.get("/embed")
async def embed(text: str, request: Request) -> dict:
model = request.app.state.embedding_model
vector = model.encode(text).tolist()
return {"embedding": vector}
# Via a dependency (cleaner and testable)
from fastapi import Depends
def get_embedding_model(request: Request):
return request.app.state.embedding_model
@app.get("/embed")
async def embed(
text: str,
model = Depends(get_embedding_model),
) -> dict:
vector = await asyncio.to_thread(model.encode, text)
return {"embedding": vector.tolist()}Complete Example: Startup Loads Embedding Model, Shutdown Closes DB Pool
Here is a production-quality lifespan for an AI service that uses a local embedding model and a PostgreSQL database:
# main.py
import asyncio
import os
from contextlib import asynccontextmanager
import asyncpg
import redis.asyncio as aioredis
from fastapi import FastAPI, Request, Depends
from sentence_transformers import SentenceTransformer
from routers import chat, embeddings, health
# --- Configuration ---
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost/aidb")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
# --- Lifespan ---
@asynccontextmanager
async def lifespan(app: FastAPI):
print("=== Startup ===")
# 1. Create database connection pool
try:
app.state.db_pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=2,
max_size=20,
command_timeout=30,
statement_cache_size=100,
)
print(f"[startup] DB pool ready (min=2, max=20)")
except Exception as exc:
print(f"[startup] FATAL: Could not connect to database: {exc}")
raise
# 2. Connect to Redis
try:
app.state.redis = aioredis.from_url(
REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
await app.state.redis.ping()
print("[startup] Redis connected")
except Exception as exc:
print(f"[startup] WARNING: Redis unavailable: {exc}")
app.state.redis = None
# 3. Load embedding model (synchronous — run in thread pool)
try:
print(f"[startup] Loading embedding model: {EMBEDDING_MODEL}")
model = await asyncio.to_thread(SentenceTransformer, EMBEDDING_MODEL)
app.state.embedding_model = model
print(f"[startup] Embedding model loaded")
except Exception as exc:
print(f"[startup] WARNING: Could not load embedding model: {exc}")
app.state.embedding_model = None
# 4. Mark service as ready
app.state.ready = True
print("=== Startup complete — accepting requests ===")
yield # ← Application serves requests here
print("=== Shutdown ===")
# Close DB pool
if hasattr(app.state, "db_pool") and app.state.db_pool:
await app.state.db_pool.close()
print("[shutdown] DB pool closed")
# Close Redis
if hasattr(app.state, "redis") and app.state.redis:
await app.state.redis.aclose()
print("[shutdown] Redis closed")
# Unload embedding model
if hasattr(app.state, "embedding_model"):
del app.state.embedding_model
print("[shutdown] Embedding model unloaded")
print("=== Shutdown complete ===")
# --- App factory ---
app = FastAPI(
title="AI Platform Service",
version="1.0.0",
lifespan=lifespan,
)
app.include_router(chat.router)
app.include_router(embeddings.router)
app.include_router(health.router)# routers/embeddings.py
import asyncio
from fastapi import APIRouter, Request, Depends, HTTPException
from pydantic import BaseModel, Field
router = APIRouter(prefix="/embeddings", tags=["embeddings"])
class EmbedRequest(BaseModel):
texts: list[str] = Field(..., min_length=1, max_length=100)
class EmbedResponse(BaseModel):
embeddings: list[list[float]]
model: str
dimensions: int
def get_embedding_model(request: Request):
model = getattr(request.app.state, "embedding_model", None)
if model is None:
raise HTTPException(
status_code=503,
detail="Embedding model is not available.",
)
return model
@router.post("/", response_model=EmbedResponse)
async def embed(
req: EmbedRequest,
model=Depends(get_embedding_model),
) -> EmbedResponse:
# encode() is synchronous — offload to thread pool
vectors = await asyncio.to_thread(model.encode, req.texts)
return EmbedResponse(
embeddings=[v.tolist() for v in vectors],
model=model.model_card_data.model_id if hasattr(model, "model_card_data") else "unknown",
dimensions=len(vectors[0]),
)Testing with Lifespan
Use TestClient as a context manager — it triggers the full lifespan:
from fastapi.testclient import TestClient
from main import app
def test_embed_endpoint():
with TestClient(app) as client:
# Lifespan runs on __enter__
resp = client.post("/embeddings/", json={"texts": ["hello world"]})
assert resp.status_code == 200
body = resp.json()
assert len(body["embeddings"]) == 1
assert len(body["embeddings"][0]) > 0
# Lifespan cleanup runs on __exit__For unit tests that should not trigger the real lifespan, override the lifespan:
from contextlib import asynccontextmanager
from unittest.mock import MagicMock
@asynccontextmanager
async def test_lifespan(app):
app.state.embedding_model = MagicMock()
app.state.embedding_model.encode.return_value = [[0.1, 0.2, 0.3]]
app.state.db_pool = MagicMock()
app.state.redis = None
app.state.ready = True
yield
app.router.lifespan_context = test_lifespanKey Takeaways
- The
@asynccontextmanager lifespanpattern replaces the deprecated@app.on_event("startup")/@app.on_event("shutdown")decorators - Code before
yieldruns at startup; code afteryieldruns at shutdown - Assign initialised resources to
app.state— they persist for the lifetime of the process - Load local ML models in
asyncio.to_thread()to avoid blocking the event loop during startup - Wrap startup in try/except — a fatal failure (no DB connection) should raise and halt startup; a degraded failure (no Redis) can log a warning and continue
- Access
app.statein handlers viarequest.app.stateor via aDepends()factory that reads fromrequest.app.state TestClientused as a context manager (with TestClient(app) as client) triggers the full lifespan — important for integration tests
Next lesson: Health Check Endpoints — liveness, readiness, and startup probes for Kubernetes and Azure Container Apps.