Application Lifespan: Startup and Shutdown
Use FastAPI's asynccontextmanager lifespan pattern to initialise DB pools, Redis, and embedding models at startup, then clean up on shutdown. Covers app.state for resource sharing.
Why Lifespan Matters
Some resources are expensive to create and should be initialised once when the server starts, not on every request:
- Database connection pools ā opening connections takes 10ā100 ms each; a pool keeps connections ready
- Redis connections ā similar to DB pools
- Embedding models loaded in-process ā loading a sentence-transformer or a local LLM can take 5ā30 seconds
- HTTP client sessions ā re-using a session gives you connection pooling and keep-alive
- Configuration loaded from Azure Key Vault ā a remote fetch you only want to do once
Equally important: when the server shuts down gracefully, you want to close those connections cleanly rather than leaving open sockets or unflushed buffers.
The Old Way: on_event (Deprecated)
FastAPI 0.x used event decorators:
# OLD ā do not use
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.db_pool = await create_pool()
@app.on_event("shutdown")
async def shutdown():
await app.state.db_pool.close()These decorators still work but are deprecated. The lifespan pattern replaces them.
The Lifespan Pattern (Current Standard)
Use @asynccontextmanager from the standard library to define a single async context manager that covers both startup and shutdown:
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- STARTUP ---
print("Starting up...")
app.state.db_pool = await create_db_pool()
app.state.redis = await create_redis()
yield # Application runs here ā requests are handled
# --- SHUTDOWN ---
print("Shutting down...")
await app.state.db_pool.close()
await app.state.redis.aclose()
app = FastAPI(lifespan=lifespan)Everything before yield is startup; everything after yield is shutdown. The yield is where the application lives ā requests are served during that window.
Initialising a Database Pool
import asyncpg
from contextlib import asynccontextmanager
from fastapi import FastAPI
DATABASE_URL = "postgresql://user:pass@localhost/mydb"
@asynccontextmanager
async def lifespan(app: FastAPI):
# Create pool on startup
app.state.db_pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=2,
max_size=20,
command_timeout=30,
)
print(f"DB pool created: min=2, max=20")
yield
# Close pool on shutdown
await app.state.db_pool.close()
print("DB pool closed")
app = FastAPI(lifespan=lifespan)Access the pool in a dependency:
from fastapi import Request, Depends
import asyncpg
async def get_db(request: Request):
async with request.app.state.db_pool.acquire() as conn:
yield connInitialising Redis
import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from fastapi import FastAPI
REDIS_URL = "redis://localhost:6379"
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = aioredis.from_url(
REDIS_URL,
encoding="utf-8",
decode_responses=True,
max_connections=50,
)
await app.state.redis.ping()
print("Redis connected")
yield
await app.state.redis.aclose()
print("Redis disconnected")
app = FastAPI(lifespan=lifespan)Why Lifespan Matters for LLM Services (Model Loading Is Slow)
Loading a local embedding model takes several seconds ā sometimes over 30 seconds for large models. If you load the model on the first request, that user experiences a multi-second delay. If you load it in the lifespan, the model is warm before any request arrives.
from sentence_transformers import SentenceTransformer
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
@asynccontextmanager
async def lifespan(app: FastAPI):
print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}")
# SentenceTransformer is a synchronous library ā load in thread pool
model = await asyncio.to_thread(SentenceTransformer, EMBEDDING_MODEL_NAME)
app.state.embedding_model = model
print("Embedding model loaded and ready")
yield
# No explicit cleanup needed for in-memory models
del app.state.embedding_model
print("Embedding model unloaded")
app = FastAPI(lifespan=lifespan)Use asyncio.to_thread for model loading because SentenceTransformer() is synchronous and blocking. Running it directly in the async lifespan would block the event loop.
State Sharing via app.state
app.state is a plain namespace ā you can assign any attribute to it:
app.state.db_pool = pool # asyncpg pool
app.state.redis = redis_client # redis client
app.state.embedding_model = model
app.state.settings = settings # config object
app.state.feature_flags = flagsIn route handlers, access state via the Request object or via a dependency:
# Via Request
from fastapi import Request
@app.get("/embed")
async def embed(text: str, request: Request) -> dict:
model = request.app.state.embedding_model
vector = model.encode(text).tolist()
return {"embedding": vector}
# Via a dependency (cleaner and testable)
from fastapi import Depends
def get_embedding_model(request: Request):
return request.app.state.embedding_model
@app.get("/embed")
async def embed(
text: str,
model = Depends(get_embedding_model),
) -> dict:
vector = await asyncio.to_thread(model.encode, text)
return {"embedding": vector.tolist()}Complete Example: Startup Loads Embedding Model, Shutdown Closes DB Pool
Here is a production-quality lifespan for an AI service that uses a local embedding model and a PostgreSQL database:
# main.py
import asyncio
import os
from contextlib import asynccontextmanager
import asyncpg
import redis.asyncio as aioredis
from fastapi import FastAPI, Request, Depends
from sentence_transformers import SentenceTransformer
from routers import chat, embeddings, health
# --- Configuration ---
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost/aidb")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
# --- Lifespan ---
@asynccontextmanager
async def lifespan(app: FastAPI):
print("=== Startup ===")
# 1. Create database connection pool
try:
app.state.db_pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=2,
max_size=20,
command_timeout=30,
statement_cache_size=100,
)
print(f"[startup] DB pool ready (min=2, max=20)")
except Exception as exc:
print(f"[startup] FATAL: Could not connect to database: {exc}")
raise
# 2. Connect to Redis
try:
app.state.redis = aioredis.from_url(
REDIS_URL,
encoding="utf-8",
decode_responses=True,
)
await app.state.redis.ping()
print("[startup] Redis connected")
except Exception as exc:
print(f"[startup] WARNING: Redis unavailable: {exc}")
app.state.redis = None
# 3. Load embedding model (synchronous ā run in thread pool)
try:
print(f"[startup] Loading embedding model: {EMBEDDING_MODEL}")
model = await asyncio.to_thread(SentenceTransformer, EMBEDDING_MODEL)
app.state.embedding_model = model
print(f"[startup] Embedding model loaded")
except Exception as exc:
print(f"[startup] WARNING: Could not load embedding model: {exc}")
app.state.embedding_model = None
# 4. Mark service as ready
app.state.ready = True
print("=== Startup complete ā accepting requests ===")
yield # ā Application serves requests here
print("=== Shutdown ===")
# Close DB pool
if hasattr(app.state, "db_pool") and app.state.db_pool:
await app.state.db_pool.close()
print("[shutdown] DB pool closed")
# Close Redis
if hasattr(app.state, "redis") and app.state.redis:
await app.state.redis.aclose()
print("[shutdown] Redis closed")
# Unload embedding model
if hasattr(app.state, "embedding_model"):
del app.state.embedding_model
print("[shutdown] Embedding model unloaded")
print("=== Shutdown complete ===")
# --- App factory ---
app = FastAPI(
title="AI Platform Service",
version="1.0.0",
lifespan=lifespan,
)
app.include_router(chat.router)
app.include_router(embeddings.router)
app.include_router(health.router)# routers/embeddings.py
import asyncio
from fastapi import APIRouter, Request, Depends, HTTPException
from pydantic import BaseModel, Field
router = APIRouter(prefix="/embeddings", tags=["embeddings"])
class EmbedRequest(BaseModel):
texts: list[str] = Field(..., min_length=1, max_length=100)
class EmbedResponse(BaseModel):
embeddings: list[list[float]]
model: str
dimensions: int
def get_embedding_model(request: Request):
model = getattr(request.app.state, "embedding_model", None)
if model is None:
raise HTTPException(
status_code=503,
detail="Embedding model is not available.",
)
return model
@router.post("/", response_model=EmbedResponse)
async def embed(
req: EmbedRequest,
model=Depends(get_embedding_model),
) -> EmbedResponse:
# encode() is synchronous ā offload to thread pool
vectors = await asyncio.to_thread(model.encode, req.texts)
return EmbedResponse(
embeddings=[v.tolist() for v in vectors],
model=model.model_card_data.model_id if hasattr(model, "model_card_data") else "unknown",
dimensions=len(vectors[0]),
)Testing with Lifespan
Use TestClient as a context manager ā it triggers the full lifespan:
from fastapi.testclient import TestClient
from main import app
def test_embed_endpoint():
with TestClient(app) as client:
# Lifespan runs on __enter__
resp = client.post("/embeddings/", json={"texts": ["hello world"]})
assert resp.status_code == 200
body = resp.json()
assert len(body["embeddings"]) == 1
assert len(body["embeddings"][0]) > 0
# Lifespan cleanup runs on __exit__For unit tests that should not trigger the real lifespan, override the lifespan:
from contextlib import asynccontextmanager
from unittest.mock import MagicMock
@asynccontextmanager
async def test_lifespan(app):
app.state.embedding_model = MagicMock()
app.state.embedding_model.encode.return_value = [[0.1, 0.2, 0.3]]
app.state.db_pool = MagicMock()
app.state.redis = None
app.state.ready = True
yield
app.router.lifespan_context = test_lifespanKey Takeaways
- The
@asynccontextmanager lifespanpattern replaces the deprecated@app.on_event("startup")/@app.on_event("shutdown")decorators - Code before
yieldruns at startup; code afteryieldruns at shutdown - Assign initialised resources to
app.stateā they persist for the lifetime of the process - Load local ML models in
asyncio.to_thread()to avoid blocking the event loop during startup - Wrap startup in try/except ā a fatal failure (no DB connection) should raise and halt startup; a degraded failure (no Redis) can log a warning and continue
- Access
app.statein handlers viarequest.app.stateor via aDepends()factory that reads fromrequest.app.state TestClientused as a context manager (with TestClient(app) as client) triggers the full lifespan ā important for integration tests
Next lesson: Health Check Endpoints ā liveness, readiness, and startup probes for Kubernetes and Azure Container Apps.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.