Learnixo

FastAPI for AI Engineers · Lesson 9 of 12

Health Checks: Database, Redis, and LLM Status

Why Health Checks Matter

Kubernetes, Azure Container Apps, and most cloud orchestrators use health check probes to decide whether to:

  • Route traffic to a pod (readiness probe)
  • Restart a pod that has become unresponsive (liveness probe)
  • Wait before routing to a pod that is still initialising (startup probe)

Without well-designed health endpoints, the platform cannot distinguish between "service is starting up" and "service is crashed", or between "service is healthy" and "service cannot reach the database".

Three Probe Types

Liveness (/health)

The liveness probe answers: "Is this process alive?"

A liveness failure causes the orchestrator to kill and restart the container. It should only fail for states that a restart would fix — like a deadlock or an unrecoverable error. It should not fail because the database is down (that's a readiness failure).

A minimal liveness probe just returns 200:

Python
@app.get("/health")
async def liveness() -> dict:
    return {"status": "alive"}

Readiness (/health/ready)

The readiness probe answers: "Is this service ready to handle requests?"

A readiness failure causes the orchestrator to stop routing traffic to this instance, without restarting it. It should check all dependencies that the service needs to serve requests correctly:

  • Database is reachable and queries succeed
  • Cache (Redis) is reachable
  • External APIs (OpenAI endpoint) are reachable
  • Model is loaded (for local ML models)

Startup (/health/started)

The startup probe answers: "Has this service finished initialising?"

It is used instead of readiness during the startup window. A startup failure causes the orchestrator to restart the container. The startup probe gives slow-starting containers (like an LLM service loading a large model) time to initialise without being killed prematurely.

Simple Health Check Structure

Python
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/health", tags=["health"])
async def liveness() -> dict:
    """Always returns 200 if the process is running."""
    return {"status": "alive", "service": "ai-platform"}

@app.get("/health/ready", tags=["health"])
async def readiness() -> JSONResponse:
    """Returns 200 if all dependencies are healthy, 503 otherwise."""
    checks = await run_dependency_checks()
    all_healthy = all(c["healthy"] for c in checks.values())

    return JSONResponse(
        status_code=200 if all_healthy else 503,
        content={
            "status": "ready" if all_healthy else "not_ready",
            "checks": checks,
        },
    )

What Each Probe Should Check

| Dependency | Check | Timeout | |-----------|-------|---------| | PostgreSQL | SELECT 1 query | 2 seconds | | Redis | PING | 1 second | | OpenAI / Azure OpenAI | GET models endpoint | 3 seconds | | Local embedding model | app.state.embedding_model is not None | No I/O | | Service Bus | Peek 0 messages | 2 seconds |

Timeout on Dependency Checks

A health endpoint must never hang. If the database is unreachable, the TCP connection attempt may block for 30 seconds — causing the probe to fail by timeout rather than reporting a clean "unhealthy" status.

Wrap every external check with asyncio.wait_for():

Python
import asyncio

async def check_database(pool) -> dict:
    try:
        await asyncio.wait_for(
            pool.fetchval("SELECT 1"),
            timeout=2.0,
        )
        return {"healthy": True, "latency_ms": None}
    except asyncio.TimeoutError:
        return {"healthy": False, "error": "Database check timed out after 2s"}
    except Exception as exc:
        return {"healthy": False, "error": str(exc)}

Returning 503 When Not Ready

Use JSONResponse directly with status_code=503 — FastAPI will send the right status code while still returning the JSON body with details:

Python
from fastapi.responses import JSONResponse

@app.get("/health/ready")
async def readiness(request: Request) -> JSONResponse:
    state = request.app.state
    checks = {}

    db_pool = getattr(state, "db_pool", None)
    if db_pool:
        checks["database"] = await check_database(db_pool)
    else:
        checks["database"] = {"healthy": False, "error": "Pool not initialised"}

    redis = getattr(state, "redis", None)
    if redis:
        checks["redis"] = await check_redis(redis)
    else:
        checks["redis"] = {"healthy": False, "error": "Redis not initialised"}

    is_ready = all(c["healthy"] for c in checks.values())

    return JSONResponse(
        status_code=200 if is_ready else 503,
        content={
            "status": "ready" if is_ready else "degraded",
            "checks": checks,
        },
    )

Complete Implementation

Here is a full health router for an AI service with Redis, PostgreSQL, and Azure OpenAI checks:

Python
# routers/health.py
import asyncio
import time
import os
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
import httpx

router = APIRouter(tags=["health"])

# --- Individual dependency checkers ---

async def check_postgres(pool) -> dict:
    """Verify the DB pool can execute a query."""
    start = time.perf_counter()
    try:
        await asyncio.wait_for(pool.fetchval("SELECT 1"), timeout=2.0)
        latency = round((time.perf_counter() - start) * 1000, 1)
        return {"healthy": True, "latency_ms": latency}
    except asyncio.TimeoutError:
        return {"healthy": False, "error": "Timeout after 2000ms"}
    except Exception as exc:
        return {"healthy": False, "error": str(exc)}


async def check_redis(redis) -> dict:
    """Verify the Redis client can PING."""
    start = time.perf_counter()
    try:
        await asyncio.wait_for(redis.ping(), timeout=1.0)
        latency = round((time.perf_counter() - start) * 1000, 1)
        return {"healthy": True, "latency_ms": latency}
    except asyncio.TimeoutError:
        return {"healthy": False, "error": "Timeout after 1000ms"}
    except Exception as exc:
        return {"healthy": False, "error": str(exc)}


async def check_openai() -> dict:
    """Verify the Azure OpenAI endpoint is reachable."""
    endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT", "")
    api_key = os.environ.get("AZURE_OPENAI_API_KEY", "")

    if not endpoint or not api_key:
        return {"healthy": False, "error": "Azure OpenAI credentials not configured"}

    url = f"{endpoint.rstrip('/')}/openai/deployments?api-version=2024-12-01-preview"
    start = time.perf_counter()

    try:
        async with httpx.AsyncClient() as client:
            resp = await asyncio.wait_for(
                client.get(
                    url,
                    headers={"api-key": api_key},
                    timeout=3.0,
                ),
                timeout=4.0,
            )
        latency = round((time.perf_counter() - start) * 1000, 1)
        if resp.status_code == 200:
            return {"healthy": True, "latency_ms": latency}
        return {
            "healthy": False,
            "error": f"HTTP {resp.status_code} from Azure OpenAI",
        }
    except asyncio.TimeoutError:
        return {"healthy": False, "error": "Azure OpenAI timeout after 4s"}
    except Exception as exc:
        return {"healthy": False, "error": str(exc)}


def check_embedding_model(state) -> dict:
    """Check whether the in-memory embedding model is loaded."""
    model = getattr(state, "embedding_model", None)
    if model is not None:
        return {"healthy": True}
    return {"healthy": False, "error": "Embedding model not loaded"}


# --- Routes ---

@router.get("/health", summary="Liveness probe")
async def liveness() -> dict:
    """
    Always returns 200 if the process is running.
    Used by Kubernetes/Container Apps as the liveness probe.
    A failure here causes the container to be restarted.
    """
    return {"status": "alive"}


@router.get("/health/started", summary="Startup probe")
async def startup_probe(request: Request) -> JSONResponse:
    """
    Returns 200 once the application has finished initialising.
    Returns 503 if still starting (model not loaded, pool not ready).
    Used as the startup probe — gives slow-starting LLM services time to init.
    """
    state = request.app.state
    ready = getattr(state, "ready", False)

    if ready:
        return JSONResponse(status_code=200, content={"status": "started"})
    return JSONResponse(
        status_code=503,
        content={"status": "starting", "detail": "Service is still initialising"},
    )


@router.get("/health/ready", summary="Readiness probe")
async def readiness(request: Request) -> JSONResponse:
    """
    Returns 200 if all dependencies are healthy.
    Returns 503 if any dependency is down — traffic will not be routed here.
    Used as the readiness probe.
    """
    state = request.app.state

    # Run all checks concurrently for speed
    results = await asyncio.gather(
        check_postgres(state.db_pool) if getattr(state, "db_pool", None) else asyncio.coroutine(lambda: {"healthy": False, "error": "Pool not initialised"})(),
        check_redis(state.redis) if getattr(state, "redis", None) else asyncio.coroutine(lambda: {"healthy": False, "error": "Redis not initialised"})(),
        check_openai(),
        return_exceptions=True,
    )

    checks = {
        "database": results[0] if not isinstance(results[0], Exception) else {"healthy": False, "error": str(results[0])},
        "redis": results[1] if not isinstance(results[1], Exception) else {"healthy": False, "error": str(results[1])},
        "openai": results[2] if not isinstance(results[2], Exception) else {"healthy": False, "error": str(results[2])},
        "embedding_model": check_embedding_model(state),
    }

    all_healthy = all(c["healthy"] for c in checks.values())
    status_code = 200 if all_healthy else 503

    return JSONResponse(
        status_code=status_code,
        content={
            "status": "ready" if all_healthy else "not_ready",
            "checks": checks,
        },
    )

Example healthy response from /health/ready:

JSON
{
  "status": "ready",
  "checks": {
    "database": {"healthy": true, "latency_ms": 3.2},
    "redis": {"healthy": true, "latency_ms": 0.8},
    "openai": {"healthy": true, "latency_ms": 241.0},
    "embedding_model": {"healthy": true}
  }
}

Example degraded response (Redis down):

JSON
{
  "status": "not_ready",
  "checks": {
    "database": {"healthy": true, "latency_ms": 3.4},
    "redis": {"healthy": false, "error": "Timeout after 1000ms"},
    "openai": {"healthy": true, "latency_ms": 198.0},
    "embedding_model": {"healthy": true}
  }
}

Kubernetes Probe Configuration

YAML
# k8s/deployment.yaml (excerpt)
containers:
  - name: ai-service
    image: myregistry.azurecr.io/ai-service:latest
    ports:
      - containerPort: 8000
    startupProbe:
      httpGet:
        path: /health/started
        port: 8000
      failureThreshold: 30     # 30 x 10s = 5 minutes to start
      periodSeconds: 10
    livenessProbe:
      httpGet:
        path: /health
        port: 8000
      initialDelaySeconds: 0
      periodSeconds: 10
      failureThreshold: 3
    readinessProbe:
      httpGet:
        path: /health/ready
        port: 8000
      periodSeconds: 15
      failureThreshold: 2
      successThreshold: 1

Azure Container Apps Probe Configuration

YAML
# containerapp.yaml (excerpt)
probes:
  - type: startup
    httpGet:
      path: /health/started
      port: 8000
    failureThreshold: 30
    periodSeconds: 10
  - type: liveness
    httpGet:
      path: /health
      port: 8000
    periodSeconds: 10
    failureThreshold: 3
  - type: readiness
    httpGet:
      path: /health/ready
      port: 8000
    periodSeconds: 15
    failureThreshold: 2

Testing Health Endpoints

Python
# tests/test_health.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi.testclient import TestClient
from main import app

def make_healthy_state():
    state = MagicMock()
    state.db_pool = MagicMock()
    state.redis = MagicMock()
    state.redis.ping = AsyncMock(return_value=True)
    state.db_pool.fetchval = AsyncMock(return_value=1)
    state.embedding_model = MagicMock()
    state.ready = True
    return state

def test_liveness():
    with TestClient(app) as client:
        resp = client.get("/health")
    assert resp.status_code == 200
    assert resp.json()["status"] == "alive"

def test_readiness_returns_503_when_db_is_down(monkeypatch):
    with TestClient(app) as client:
        # Simulate DB pool returning an error
        app.state.db_pool.fetchval = AsyncMock(side_effect=Exception("Connection refused"))
        app.state.redis = MagicMock()
        app.state.redis.ping = AsyncMock(return_value=True)
        app.state.embedding_model = MagicMock()

        with patch("routers.health.check_openai", return_value={"healthy": True}):
            resp = client.get("/health/ready")

    assert resp.status_code == 503
    body = resp.json()
    assert body["status"] == "not_ready"
    assert body["checks"]["database"]["healthy"] is False

Key Takeaways

  • Three probes serve different purposes: liveness (is the process alive?), readiness (are all dependencies healthy?), startup (has initialisation finished?)
  • Liveness failures trigger container restarts — only fail for unrecoverable states, not for transient dependency outages
  • Always wrap external dependency checks with asyncio.wait_for() — a hung health check is worse than a failed one
  • Return JSONResponse(status_code=503, content={...}) to send a detailed body with a non-200 status
  • Run multiple dependency checks concurrently with asyncio.gather() to keep the health endpoint fast (under 5 seconds total)
  • The startup probe is essential for LLM services that load large models — it prevents the container from being killed before it finishes initialising

Next lesson: Dockerising your FastAPI AI service for production.