Learnixo

FastAPI for AI Engineers · Lesson 6 of 12

Background Tasks: Async Processing Without Blocking

What Are Background Tasks?

FastAPI's BackgroundTasks lets you schedule a function to run after the HTTP response has been sent to the client. The request handler returns immediately, and the background task runs in the same event loop (or thread pool for synchronous tasks).

Use cases for background tasks in AI services:

  • Writing LLM call logs to a database after returning the response
  • Sending usage notification emails to users
  • Invalidating a cache after data changes
  • Posting telemetry or metrics to an external monitoring service
  • Triggering downstream processing that the caller doesn't need to wait for

Basic Usage

Inject BackgroundTasks into your route handler as a parameter. FastAPI recognises the type and provides the instance automatically:

Python
from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

def write_log(message: str):
    """Simulated slow logging call — runs after response is sent."""
    time.sleep(0.5)   # synchronous  OK here because FastAPI runs it in a thread
    with open("requests.log", "a") as f:
        f.write(f"{time.time()}: {message}\n")

@app.post("/process")
async def process(text: str, background_tasks: BackgroundTasks) -> dict:
    # Do the main work
    result = {"processed": text.upper(), "length": len(text)}

    # Schedule the log write  runs after this function returns
    background_tasks.add_task(write_log, f"processed: {text}")

    # Return immediately  the log write happens in background
    return result

The caller receives the response in under a millisecond; the log write happens after, without blocking the response.

Adding Multiple Tasks

You can add as many background tasks as you need:

Python
@app.post("/chat")
async def chat(req: ChatRequest, background_tasks: BackgroundTasks) -> ChatResponse:
    response = await llm_client.complete(req.messages)

    background_tasks.add_task(log_llm_call, req, response)
    background_tasks.add_task(update_usage_counter, req.user_id)
    background_tasks.add_task(invalidate_cache, f"user:{req.user_id}:history")

    return response

Tasks are executed in the order they were added.

Async Background Tasks

Background tasks can be async def as well — FastAPI awaits them in the event loop:

Python
import asyncio
import httpx

async def post_audit_event(event: dict) -> None:
    """Post an audit event to an external security platform."""
    async with httpx.AsyncClient() as client:
        try:
            await client.post(
                "https://audit.internal/events",
                json=event,
                timeout=5.0,
            )
        except Exception as exc:
            # Log the failure  don't let it crash the background task runner
            print(f"Audit post failed: {exc}")

@app.post("/drugs/{drug_id}/dispense")
async def dispense_drug(
    drug_id: int,
    user_id: str,
    background_tasks: BackgroundTasks,
) -> dict:
    result = await process_dispense(drug_id, user_id)

    background_tasks.add_task(post_audit_event, {
        "action": "dispense",
        "drug_id": drug_id,
        "user_id": user_id,
        "timestamp": result["timestamp"],
    })

    return result

Dependency Injection with BackgroundTasks

You can inject BackgroundTasks into dependencies too — useful when a service layer needs to schedule tasks without coupling to the router:

Python
from fastapi import Depends, BackgroundTasks
from typing import Callable

class AuditService:
    def __init__(self, background_tasks: BackgroundTasks):
        self.background_tasks = background_tasks

    def log_event(self, event_type: str, details: dict) -> None:
        self.background_tasks.add_task(
            self._write_to_db,
            event_type,
            details,
        )

    async def _write_to_db(self, event_type: str, details: dict) -> None:
        # In production: insert into an audit_events table
        print(f"AUDIT: {event_type} — {details}")


def get_audit_service(background_tasks: BackgroundTasks) -> AuditService:
    return AuditService(background_tasks)


@app.post("/chat")
async def chat(
    req: ChatRequest,
    audit: AuditService = Depends(get_audit_service),
) -> ChatResponse:
    response = await llm_client.complete(req.messages)
    audit.log_event("llm_call", {"user_id": req.user_id, "model": req.model})
    return response

Real Example: Log Every LLM Call to DB After Returning the Response

Here is a complete, production-style implementation that logs LLM calls asynchronously after the response is sent:

Python
# services/llm_logger.py
import asyncio
from datetime import datetime
from typing import Any
import asyncpg

DATABASE_URL = "postgresql://user:pass@localhost/ai_service_db"


async def log_llm_call(
    user_id: str,
    model: str,
    prompt_tokens: int,
    completion_tokens: int,
    duration_ms: float,
    finish_reason: str,
    error: str | None = None,
) -> None:
    """
    Write an LLM call record to the audit log table.
    Runs as a background task — errors are caught and logged, never re-raised.
    """
    try:
        conn = await asyncpg.connect(DATABASE_URL)
        try:
            await conn.execute(
                """
                INSERT INTO llm_audit_log
                    (user_id, model, prompt_tokens, completion_tokens,
                     total_tokens, duration_ms, finish_reason, error, created_at)
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
                """,
                user_id,
                model,
                prompt_tokens,
                completion_tokens,
                prompt_tokens + completion_tokens,
                duration_ms,
                finish_reason,
                error,
                datetime.utcnow(),
            )
        finally:
            await conn.close()
    except Exception as exc:
        # Background task must not raise  log to stderr and continue
        import sys
        print(f"[llm_logger] Failed to write audit log: {exc}", file=sys.stderr)
Python
# routers/chat.py
import time
from fastapi import APIRouter, BackgroundTasks
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from services.llm_logger import log_llm_call

router = APIRouter(prefix="/chat", tags=["chat"])
client = AsyncOpenAI()


class ChatRequest(BaseModel):
    user_id: str
    messages: list[dict] = Field(..., min_length=1)
    model: str = "gpt-4o"
    max_tokens: int = Field(default=1024, ge=1, le=4096)


class ChatResponse(BaseModel):
    content: str
    model: str
    prompt_tokens: int
    completion_tokens: int


@router.post("/", response_model=ChatResponse)
async def chat(
    req: ChatRequest,
    background_tasks: BackgroundTasks,
) -> ChatResponse:
    start = time.perf_counter()
    error_msg: str | None = None

    try:
        raw = await client.chat.completions.create(
            model=req.model,
            messages=req.messages,
            max_tokens=req.max_tokens,
        )
    except Exception as exc:
        error_msg = str(exc)
        raise

    duration_ms = (time.perf_counter() - start) * 1000
    choice = raw.choices[0]
    usage = raw.usage

    # Schedule DB write  happens after we return the response
    background_tasks.add_task(
        log_llm_call,
        user_id=req.user_id,
        model=raw.model,
        prompt_tokens=usage.prompt_tokens,
        completion_tokens=usage.completion_tokens,
        duration_ms=duration_ms,
        finish_reason=choice.finish_reason,
        error=error_msg,
    )

    return ChatResponse(
        content=choice.message.content or "",
        model=raw.model,
        prompt_tokens=usage.prompt_tokens,
        completion_tokens=usage.completion_tokens,
    )

The SQL schema for the audit table:

SQL
CREATE TABLE llm_audit_log (
    id              BIGSERIAL PRIMARY KEY,
    user_id         TEXT NOT NULL,
    model           TEXT NOT NULL,
    prompt_tokens   INTEGER NOT NULL,
    completion_tokens INTEGER NOT NULL,
    total_tokens    INTEGER NOT NULL,
    duration_ms     DOUBLE PRECISION,
    finish_reason   TEXT,
    error           TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX ON llm_audit_log (user_id, created_at DESC);
CREATE INDEX ON llm_audit_log (created_at DESC);

Limitations of BackgroundTasks

BackgroundTasks is simple and zero-configuration, but it has important limitations:

| Limitation | Detail | |-----------|--------| | Not persistent | If the process crashes, queued tasks are lost | | No retry logic | A failed task is gone — you must implement retries yourself | | Tied to the request lifecycle | Tasks run in the same worker process that handled the request | | No scheduling | You cannot delay a task or run it at a specific time | | No monitoring | No built-in dashboard, no task status, no dead-letter queue | | Not distributed | Cannot spread work across multiple worker processes |

For tasks where any of these limitations matter, use a proper task queue.

When to Use Celery or Azure Service Bus Instead

Use Celery when:

  • Tasks must survive process restarts
  • You need retry logic with exponential back-off
  • You need task scheduling (run at 3 AM daily)
  • You need task result storage and status polling
  • You process high volumes (thousands of tasks per minute)
Python
# celery_app.py  minimal Celery setup
from celery import Celery

celery = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

@celery.task(
    bind=True,
    max_retries=3,
    default_retry_delay=5,
)
def send_completion_email(self, user_email: str, content: str) -> None:
    try:
        _send_email(user_email, content)
    except Exception as exc:
        raise self.retry(exc=exc)
Python
# In your FastAPI route  fire and forget to Celery
from celery_app import send_completion_email

@app.post("/chat")
async def chat(req: ChatRequest) -> ChatResponse:
    response = await llm_client.complete(req.messages)

    # Celery picks this up and runs it in a separate worker process
    send_completion_email.delay(req.user_email, response.content)

    return response

Use Azure Service Bus when:

  • You are on Azure and want a managed queue with no infrastructure to run
  • You need guaranteed delivery (messages survive queue service restarts)
  • You need dead-letter queues for failed messages
  • Tasks are produced by multiple services (fan-out pattern)
  • You need cross-language consumers (Python producer, .NET consumer)
Python
# Publish to Azure Service Bus from FastAPI
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
import json

CONNECTION_STRING = os.environ["SERVICE_BUS_CONNECTION_STRING"]
QUEUE_NAME = "llm-audit-queue"

async def enqueue_audit_event(payload: dict) -> None:
    async with ServiceBusClient.from_connection_string(CONNECTION_STRING) as client:
        async with client.get_queue_sender(QUEUE_NAME) as sender:
            msg = ServiceBusMessage(json.dumps(payload))
            await sender.send_messages(msg)

@app.post("/chat")
async def chat(req: ChatRequest, background_tasks: BackgroundTasks) -> ChatResponse:
    response = await llm_client.complete(req.messages)
    background_tasks.add_task(enqueue_audit_event, {
        "user_id": req.user_id,
        "tokens": response.usage.total_tokens,
    })
    return response

Decision Guide

Is the task cheap and non-critical?
  YES → BackgroundTasks (log, metric, cache invalidation)
  NO ↓

Must it survive process crashes?
  YES → Celery or Azure Service Bus
  NO ↓

Are you on Azure?
  YES → Azure Service Bus
  NO → Celery with Redis

Key Takeaways

  • BackgroundTasks runs a function after the HTTP response is sent — the client does not wait for it
  • Both sync and async functions can be background tasks — FastAPI handles the execution context correctly
  • Background tasks are fire-and-forget: errors must be caught inside the task, not by the caller
  • Inject BackgroundTasks into service layer dependencies to keep routing logic clean
  • Use BackgroundTasks for audit logging, metrics, cache invalidation, and other low-stakes side effects
  • Reach for Celery (Redis/RabbitMQ) or Azure Service Bus when you need durability, retries, scheduling, or distributed processing

Next lesson: Dependency Injection — the Depends() system for injecting clients, sessions, and the authenticated user.