Learnixo

FastAPI for AI Engineers · Lesson 8 of 12

Lifespan Events: Startup and Shutdown

Why Lifespan Matters

Some resources are expensive to create and should be initialised once when the server starts, not on every request:

  • Database connection pools — opening connections takes 10–100 ms each; a pool keeps connections ready
  • Redis connections — similar to DB pools
  • Embedding models loaded in-process — loading a sentence-transformer or a local LLM can take 5–30 seconds
  • HTTP client sessions — re-using a session gives you connection pooling and keep-alive
  • Configuration loaded from Azure Key Vault — a remote fetch you only want to do once

Equally important: when the server shuts down gracefully, you want to close those connections cleanly rather than leaving open sockets or unflushed buffers.

The Old Way: on_event (Deprecated)

FastAPI 0.x used event decorators:

Python
# OLD  do not use
from fastapi import FastAPI

app = FastAPI()

@app.on_event("startup")
async def startup():
    app.state.db_pool = await create_pool()

@app.on_event("shutdown")
async def shutdown():
    await app.state.db_pool.close()

These decorators still work but are deprecated. The lifespan pattern replaces them.

The Lifespan Pattern (Current Standard)

Use @asynccontextmanager from the standard library to define a single async context manager that covers both startup and shutdown:

Python
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # --- STARTUP ---
    print("Starting up...")
    app.state.db_pool = await create_db_pool()
    app.state.redis = await create_redis()

    yield   # Application runs here  requests are handled

    # --- SHUTDOWN ---
    print("Shutting down...")
    await app.state.db_pool.close()
    await app.state.redis.aclose()


app = FastAPI(lifespan=lifespan)

Everything before yield is startup; everything after yield is shutdown. The yield is where the application lives — requests are served during that window.

Initialising a Database Pool

Python
import asyncpg
from contextlib import asynccontextmanager
from fastapi import FastAPI

DATABASE_URL = "postgresql://user:pass@localhost/mydb"

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Create pool on startup
    app.state.db_pool = await asyncpg.create_pool(
        DATABASE_URL,
        min_size=2,
        max_size=20,
        command_timeout=30,
    )
    print(f"DB pool created: min=2, max=20")

    yield

    # Close pool on shutdown
    await app.state.db_pool.close()
    print("DB pool closed")


app = FastAPI(lifespan=lifespan)

Access the pool in a dependency:

Python
from fastapi import Request, Depends
import asyncpg

async def get_db(request: Request):
    async with request.app.state.db_pool.acquire() as conn:
        yield conn

Initialising Redis

Python
import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from fastapi import FastAPI

REDIS_URL = "redis://localhost:6379"

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.redis = aioredis.from_url(
        REDIS_URL,
        encoding="utf-8",
        decode_responses=True,
        max_connections=50,
    )
    await app.state.redis.ping()
    print("Redis connected")

    yield

    await app.state.redis.aclose()
    print("Redis disconnected")


app = FastAPI(lifespan=lifespan)

Why Lifespan Matters for LLM Services (Model Loading Is Slow)

Loading a local embedding model takes several seconds — sometimes over 30 seconds for large models. If you load the model on the first request, that user experiences a multi-second delay. If you load it in the lifespan, the model is warm before any request arrives.

Python
from sentence_transformers import SentenceTransformer
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio

EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"

@asynccontextmanager
async def lifespan(app: FastAPI):
    print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}")

    # SentenceTransformer is a synchronous library  load in thread pool
    model = await asyncio.to_thread(SentenceTransformer, EMBEDDING_MODEL_NAME)
    app.state.embedding_model = model

    print("Embedding model loaded and ready")

    yield

    # No explicit cleanup needed for in-memory models
    del app.state.embedding_model
    print("Embedding model unloaded")


app = FastAPI(lifespan=lifespan)

Use asyncio.to_thread for model loading because SentenceTransformer() is synchronous and blocking. Running it directly in the async lifespan would block the event loop.

State Sharing via app.state

app.state is a plain namespace — you can assign any attribute to it:

Python
app.state.db_pool = pool        # asyncpg pool
app.state.redis = redis_client  # redis client
app.state.embedding_model = model
app.state.settings = settings   # config object
app.state.feature_flags = flags

In route handlers, access state via the Request object or via a dependency:

Python
# Via Request
from fastapi import Request

@app.get("/embed")
async def embed(text: str, request: Request) -> dict:
    model = request.app.state.embedding_model
    vector = model.encode(text).tolist()
    return {"embedding": vector}

# Via a dependency (cleaner and testable)
from fastapi import Depends

def get_embedding_model(request: Request):
    return request.app.state.embedding_model

@app.get("/embed")
async def embed(
    text: str,
    model = Depends(get_embedding_model),
) -> dict:
    vector = await asyncio.to_thread(model.encode, text)
    return {"embedding": vector.tolist()}

Complete Example: Startup Loads Embedding Model, Shutdown Closes DB Pool

Here is a production-quality lifespan for an AI service that uses a local embedding model and a PostgreSQL database:

Python
# main.py
import asyncio
import os
from contextlib import asynccontextmanager

import asyncpg
import redis.asyncio as aioredis
from fastapi import FastAPI, Request, Depends
from sentence_transformers import SentenceTransformer

from routers import chat, embeddings, health

# --- Configuration ---

DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost/aidb")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2")


# --- Lifespan ---

@asynccontextmanager
async def lifespan(app: FastAPI):
    print("=== Startup ===")

    # 1. Create database connection pool
    try:
        app.state.db_pool = await asyncpg.create_pool(
            DATABASE_URL,
            min_size=2,
            max_size=20,
            command_timeout=30,
            statement_cache_size=100,
        )
        print(f"[startup] DB pool ready (min=2, max=20)")
    except Exception as exc:
        print(f"[startup] FATAL: Could not connect to database: {exc}")
        raise

    # 2. Connect to Redis
    try:
        app.state.redis = aioredis.from_url(
            REDIS_URL,
            encoding="utf-8",
            decode_responses=True,
        )
        await app.state.redis.ping()
        print("[startup] Redis connected")
    except Exception as exc:
        print(f"[startup] WARNING: Redis unavailable: {exc}")
        app.state.redis = None

    # 3. Load embedding model (synchronous  run in thread pool)
    try:
        print(f"[startup] Loading embedding model: {EMBEDDING_MODEL}")
        model = await asyncio.to_thread(SentenceTransformer, EMBEDDING_MODEL)
        app.state.embedding_model = model
        print(f"[startup] Embedding model loaded")
    except Exception as exc:
        print(f"[startup] WARNING: Could not load embedding model: {exc}")
        app.state.embedding_model = None

    # 4. Mark service as ready
    app.state.ready = True
    print("=== Startup complete — accepting requests ===")

    yield   #  Application serves requests here

    print("=== Shutdown ===")

    # Close DB pool
    if hasattr(app.state, "db_pool") and app.state.db_pool:
        await app.state.db_pool.close()
        print("[shutdown] DB pool closed")

    # Close Redis
    if hasattr(app.state, "redis") and app.state.redis:
        await app.state.redis.aclose()
        print("[shutdown] Redis closed")

    # Unload embedding model
    if hasattr(app.state, "embedding_model"):
        del app.state.embedding_model
        print("[shutdown] Embedding model unloaded")

    print("=== Shutdown complete ===")


# --- App factory ---

app = FastAPI(
    title="AI Platform Service",
    version="1.0.0",
    lifespan=lifespan,
)

app.include_router(chat.router)
app.include_router(embeddings.router)
app.include_router(health.router)
Python
# routers/embeddings.py
import asyncio
from fastapi import APIRouter, Request, Depends, HTTPException
from pydantic import BaseModel, Field

router = APIRouter(prefix="/embeddings", tags=["embeddings"])


class EmbedRequest(BaseModel):
    texts: list[str] = Field(..., min_length=1, max_length=100)

class EmbedResponse(BaseModel):
    embeddings: list[list[float]]
    model: str
    dimensions: int


def get_embedding_model(request: Request):
    model = getattr(request.app.state, "embedding_model", None)
    if model is None:
        raise HTTPException(
            status_code=503,
            detail="Embedding model is not available.",
        )
    return model


@router.post("/", response_model=EmbedResponse)
async def embed(
    req: EmbedRequest,
    model=Depends(get_embedding_model),
) -> EmbedResponse:
    # encode() is synchronous  offload to thread pool
    vectors = await asyncio.to_thread(model.encode, req.texts)
    return EmbedResponse(
        embeddings=[v.tolist() for v in vectors],
        model=model.model_card_data.model_id if hasattr(model, "model_card_data") else "unknown",
        dimensions=len(vectors[0]),
    )

Testing with Lifespan

Use TestClient as a context manager — it triggers the full lifespan:

Python
from fastapi.testclient import TestClient
from main import app

def test_embed_endpoint():
    with TestClient(app) as client:
        # Lifespan runs on __enter__
        resp = client.post("/embeddings/", json={"texts": ["hello world"]})
        assert resp.status_code == 200
        body = resp.json()
        assert len(body["embeddings"]) == 1
        assert len(body["embeddings"][0]) > 0
    # Lifespan cleanup runs on __exit__

For unit tests that should not trigger the real lifespan, override the lifespan:

Python
from contextlib import asynccontextmanager
from unittest.mock import MagicMock

@asynccontextmanager
async def test_lifespan(app):
    app.state.embedding_model = MagicMock()
    app.state.embedding_model.encode.return_value = [[0.1, 0.2, 0.3]]
    app.state.db_pool = MagicMock()
    app.state.redis = None
    app.state.ready = True
    yield

app.router.lifespan_context = test_lifespan

Key Takeaways

  • The @asynccontextmanager lifespan pattern replaces the deprecated @app.on_event("startup") / @app.on_event("shutdown") decorators
  • Code before yield runs at startup; code after yield runs at shutdown
  • Assign initialised resources to app.state — they persist for the lifetime of the process
  • Load local ML models in asyncio.to_thread() to avoid blocking the event loop during startup
  • Wrap startup in try/except — a fatal failure (no DB connection) should raise and halt startup; a degraded failure (no Redis) can log a warning and continue
  • Access app.state in handlers via request.app.state or via a Depends() factory that reads from request.app.state
  • TestClient used as a context manager (with TestClient(app) as client) triggers the full lifespan — important for integration tests

Next lesson: Health Check Endpoints — liveness, readiness, and startup probes for Kubernetes and Azure Container Apps.