FastAPI for AI Engineers · Lesson 6 of 12
Background Tasks: Async Processing Without Blocking
What Are Background Tasks?
FastAPI's BackgroundTasks lets you schedule a function to run after the HTTP response has been sent to the client. The request handler returns immediately, and the background task runs in the same event loop (or thread pool for synchronous tasks).
Use cases for background tasks in AI services:
- Writing LLM call logs to a database after returning the response
- Sending usage notification emails to users
- Invalidating a cache after data changes
- Posting telemetry or metrics to an external monitoring service
- Triggering downstream processing that the caller doesn't need to wait for
Basic Usage
Inject BackgroundTasks into your route handler as a parameter. FastAPI recognises the type and provides the instance automatically:
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def write_log(message: str):
"""Simulated slow logging call — runs after response is sent."""
time.sleep(0.5) # synchronous — OK here because FastAPI runs it in a thread
with open("requests.log", "a") as f:
f.write(f"{time.time()}: {message}\n")
@app.post("/process")
async def process(text: str, background_tasks: BackgroundTasks) -> dict:
# Do the main work
result = {"processed": text.upper(), "length": len(text)}
# Schedule the log write — runs after this function returns
background_tasks.add_task(write_log, f"processed: {text}")
# Return immediately — the log write happens in background
return resultThe caller receives the response in under a millisecond; the log write happens after, without blocking the response.
Adding Multiple Tasks
You can add as many background tasks as you need:
@app.post("/chat")
async def chat(req: ChatRequest, background_tasks: BackgroundTasks) -> ChatResponse:
response = await llm_client.complete(req.messages)
background_tasks.add_task(log_llm_call, req, response)
background_tasks.add_task(update_usage_counter, req.user_id)
background_tasks.add_task(invalidate_cache, f"user:{req.user_id}:history")
return responseTasks are executed in the order they were added.
Async Background Tasks
Background tasks can be async def as well — FastAPI awaits them in the event loop:
import asyncio
import httpx
async def post_audit_event(event: dict) -> None:
"""Post an audit event to an external security platform."""
async with httpx.AsyncClient() as client:
try:
await client.post(
"https://audit.internal/events",
json=event,
timeout=5.0,
)
except Exception as exc:
# Log the failure — don't let it crash the background task runner
print(f"Audit post failed: {exc}")
@app.post("/drugs/{drug_id}/dispense")
async def dispense_drug(
drug_id: int,
user_id: str,
background_tasks: BackgroundTasks,
) -> dict:
result = await process_dispense(drug_id, user_id)
background_tasks.add_task(post_audit_event, {
"action": "dispense",
"drug_id": drug_id,
"user_id": user_id,
"timestamp": result["timestamp"],
})
return resultDependency Injection with BackgroundTasks
You can inject BackgroundTasks into dependencies too — useful when a service layer needs to schedule tasks without coupling to the router:
from fastapi import Depends, BackgroundTasks
from typing import Callable
class AuditService:
def __init__(self, background_tasks: BackgroundTasks):
self.background_tasks = background_tasks
def log_event(self, event_type: str, details: dict) -> None:
self.background_tasks.add_task(
self._write_to_db,
event_type,
details,
)
async def _write_to_db(self, event_type: str, details: dict) -> None:
# In production: insert into an audit_events table
print(f"AUDIT: {event_type} — {details}")
def get_audit_service(background_tasks: BackgroundTasks) -> AuditService:
return AuditService(background_tasks)
@app.post("/chat")
async def chat(
req: ChatRequest,
audit: AuditService = Depends(get_audit_service),
) -> ChatResponse:
response = await llm_client.complete(req.messages)
audit.log_event("llm_call", {"user_id": req.user_id, "model": req.model})
return responseReal Example: Log Every LLM Call to DB After Returning the Response
Here is a complete, production-style implementation that logs LLM calls asynchronously after the response is sent:
# services/llm_logger.py
import asyncio
from datetime import datetime
from typing import Any
import asyncpg
DATABASE_URL = "postgresql://user:pass@localhost/ai_service_db"
async def log_llm_call(
user_id: str,
model: str,
prompt_tokens: int,
completion_tokens: int,
duration_ms: float,
finish_reason: str,
error: str | None = None,
) -> None:
"""
Write an LLM call record to the audit log table.
Runs as a background task — errors are caught and logged, never re-raised.
"""
try:
conn = await asyncpg.connect(DATABASE_URL)
try:
await conn.execute(
"""
INSERT INTO llm_audit_log
(user_id, model, prompt_tokens, completion_tokens,
total_tokens, duration_ms, finish_reason, error, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
""",
user_id,
model,
prompt_tokens,
completion_tokens,
prompt_tokens + completion_tokens,
duration_ms,
finish_reason,
error,
datetime.utcnow(),
)
finally:
await conn.close()
except Exception as exc:
# Background task must not raise — log to stderr and continue
import sys
print(f"[llm_logger] Failed to write audit log: {exc}", file=sys.stderr)# routers/chat.py
import time
from fastapi import APIRouter, BackgroundTasks
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from services.llm_logger import log_llm_call
router = APIRouter(prefix="/chat", tags=["chat"])
client = AsyncOpenAI()
class ChatRequest(BaseModel):
user_id: str
messages: list[dict] = Field(..., min_length=1)
model: str = "gpt-4o"
max_tokens: int = Field(default=1024, ge=1, le=4096)
class ChatResponse(BaseModel):
content: str
model: str
prompt_tokens: int
completion_tokens: int
@router.post("/", response_model=ChatResponse)
async def chat(
req: ChatRequest,
background_tasks: BackgroundTasks,
) -> ChatResponse:
start = time.perf_counter()
error_msg: str | None = None
try:
raw = await client.chat.completions.create(
model=req.model,
messages=req.messages,
max_tokens=req.max_tokens,
)
except Exception as exc:
error_msg = str(exc)
raise
duration_ms = (time.perf_counter() - start) * 1000
choice = raw.choices[0]
usage = raw.usage
# Schedule DB write — happens after we return the response
background_tasks.add_task(
log_llm_call,
user_id=req.user_id,
model=raw.model,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens,
duration_ms=duration_ms,
finish_reason=choice.finish_reason,
error=error_msg,
)
return ChatResponse(
content=choice.message.content or "",
model=raw.model,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens,
)The SQL schema for the audit table:
CREATE TABLE llm_audit_log (
id BIGSERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
model TEXT NOT NULL,
prompt_tokens INTEGER NOT NULL,
completion_tokens INTEGER NOT NULL,
total_tokens INTEGER NOT NULL,
duration_ms DOUBLE PRECISION,
finish_reason TEXT,
error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ON llm_audit_log (user_id, created_at DESC);
CREATE INDEX ON llm_audit_log (created_at DESC);Limitations of BackgroundTasks
BackgroundTasks is simple and zero-configuration, but it has important limitations:
| Limitation | Detail | |-----------|--------| | Not persistent | If the process crashes, queued tasks are lost | | No retry logic | A failed task is gone — you must implement retries yourself | | Tied to the request lifecycle | Tasks run in the same worker process that handled the request | | No scheduling | You cannot delay a task or run it at a specific time | | No monitoring | No built-in dashboard, no task status, no dead-letter queue | | Not distributed | Cannot spread work across multiple worker processes |
For tasks where any of these limitations matter, use a proper task queue.
When to Use Celery or Azure Service Bus Instead
Use Celery when:
- Tasks must survive process restarts
- You need retry logic with exponential back-off
- You need task scheduling (run at 3 AM daily)
- You need task result storage and status polling
- You process high volumes (thousands of tasks per minute)
# celery_app.py — minimal Celery setup
from celery import Celery
celery = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
@celery.task(
bind=True,
max_retries=3,
default_retry_delay=5,
)
def send_completion_email(self, user_email: str, content: str) -> None:
try:
_send_email(user_email, content)
except Exception as exc:
raise self.retry(exc=exc)# In your FastAPI route — fire and forget to Celery
from celery_app import send_completion_email
@app.post("/chat")
async def chat(req: ChatRequest) -> ChatResponse:
response = await llm_client.complete(req.messages)
# Celery picks this up and runs it in a separate worker process
send_completion_email.delay(req.user_email, response.content)
return responseUse Azure Service Bus when:
- You are on Azure and want a managed queue with no infrastructure to run
- You need guaranteed delivery (messages survive queue service restarts)
- You need dead-letter queues for failed messages
- Tasks are produced by multiple services (fan-out pattern)
- You need cross-language consumers (Python producer, .NET consumer)
# Publish to Azure Service Bus from FastAPI
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
import json
CONNECTION_STRING = os.environ["SERVICE_BUS_CONNECTION_STRING"]
QUEUE_NAME = "llm-audit-queue"
async def enqueue_audit_event(payload: dict) -> None:
async with ServiceBusClient.from_connection_string(CONNECTION_STRING) as client:
async with client.get_queue_sender(QUEUE_NAME) as sender:
msg = ServiceBusMessage(json.dumps(payload))
await sender.send_messages(msg)
@app.post("/chat")
async def chat(req: ChatRequest, background_tasks: BackgroundTasks) -> ChatResponse:
response = await llm_client.complete(req.messages)
background_tasks.add_task(enqueue_audit_event, {
"user_id": req.user_id,
"tokens": response.usage.total_tokens,
})
return responseDecision Guide
Is the task cheap and non-critical?
YES → BackgroundTasks (log, metric, cache invalidation)
NO ↓
Must it survive process crashes?
YES → Celery or Azure Service Bus
NO ↓
Are you on Azure?
YES → Azure Service Bus
NO → Celery with RedisKey Takeaways
BackgroundTasksruns a function after the HTTP response is sent — the client does not wait for it- Both sync and async functions can be background tasks — FastAPI handles the execution context correctly
- Background tasks are fire-and-forget: errors must be caught inside the task, not by the caller
- Inject
BackgroundTasksinto service layer dependencies to keep routing logic clean - Use BackgroundTasks for audit logging, metrics, cache invalidation, and other low-stakes side effects
- Reach for Celery (Redis/RabbitMQ) or Azure Service Bus when you need durability, retries, scheduling, or distributed processing
Next lesson: Dependency Injection — the Depends() system for injecting clients, sessions, and the authenticated user.