Back to blog
AI Systemsintermediate

Skill 7 — Practical LLM Integration: Streaming (SSE), Caching & Fallbacks

Wire Azure OpenAI into your FastAPI backend with production-grade patterns: Server-Sent Events streaming, prompt caching, retry on failure, and cost-aware model routing.

Asma Hafeez KhanMay 15, 20264 min read
Azure OpenAIStreamingSSEPrompt CachingLLMFastAPIProduction
Share:𝕏

The Four Pillars of Production LLM Integration

  1. Streaming — tokens appear immediately, no waiting for full response
  2. Caching — identical queries hit the cache, not the LLM
  3. Retry with backoff — transient Azure failures don't crash the request
  4. Cost routing — route simple queries to cheaper models

Streaming with Server-Sent Events

SSE is a one-way stream from server to browser. The browser opens an EventSource connection, and the server pushes tokens as they arrive from Azure OpenAI.

Backend: async generator

Python
# pharmabot/services/llm.py
from openai import AsyncAzureOpenAI, RateLimitError, APITimeoutError
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pharmabot.config import settings
import asyncio

class LLMService:
    def __init__(self):
        self.client = AsyncAzureOpenAI(
            api_key=settings.azure_openai_api_key,
            azure_endpoint=settings.azure_openai_endpoint,
            api_version="2024-02-01",
            timeout=30.0,
        )

    @retry(
        retry=retry_if_exception_type((RateLimitError, APITimeoutError)),
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=8),
    )
    async def stream_completion(
        self, messages: list[dict], temperature: float = 0.1
    ):
        async with self.client.chat.completions.stream(
            model=settings.azure_openai_deployment,  # gpt-4o
            messages=messages,
            temperature=temperature,
            max_tokens=800,
            stream=True,
        ) as stream:
            async for chunk in stream:
                if chunk.choices and chunk.choices[0].delta.content:
                    yield chunk.choices[0].delta.content

Frontend: EventSource

TYPESCRIPT
// frontend/src/api/stream.ts
export async function streamChat(
  message: string,
  sessionId: string,
  onToken: (token: string) => void,
  onDone: () => void,
) {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message, session_id: sessionId }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split("\n");

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = line.slice(6);
        if (data === "[DONE]") { onDone(); return; }
        onToken(data);
      }
    }
  }
}

Response Caching with Redis

Repeated identical queries (common for "what is ibuprofen?" type questions) should never hit the LLM:

Python
# pharmabot/services/cache.py
import hashlib, json
from pharmabot.cache import redis_client

CACHE_TTL = 3600   # 1 hour

def make_cache_key(messages: list[dict]) -> str:
    payload = json.dumps(messages, sort_keys=True)
    return f"llm:response:{hashlib.sha256(payload.encode()).hexdigest()}"

async def get_cached_response(messages: list[dict]) -> str | None:
    key = make_cache_key(messages)
    return await redis_client.get(key)

async def cache_response(messages: list[dict], response: str) -> None:
    key = make_cache_key(messages)
    await redis_client.setex(key, CACHE_TTL, response)

Wrap the LLM service to check the cache first:

Python
async def stream_with_cache(self, messages: list[dict]):
    cached = await get_cached_response(messages)
    if cached:
        # Replay cached response token by token (maintains streaming UX)
        for word in cached.split(" "):
            yield word + " "
            await asyncio.sleep(0.01)   # simulate streaming pace
        return

    full_response = ""
    async for token in self.stream_completion(messages):
        full_response += token
        yield token

    await cache_response(messages, full_response)

Cost-Aware Model Routing

Not every query needs GPT-4o. Route simple classification tasks to gpt-4o-mini:

Python
# pharmabot/services/router.py

def select_model(task: str) -> str:
    """
    Route to cheaper model for simple tasks.
    GPT-4o-mini costs ~15x less than GPT-4o.
    """
    simple_tasks = {"classify", "triage", "extract_drug_names"}
    if task in simple_tasks:
        return "gpt-4o-mini"
    return "gpt-4o"   # full model for drug info and interaction analysis

The Triage Agent (which only classifies intent) uses gpt-4o-mini. The Drug Info and Interaction Checker agents use gpt-4o for higher accuracy. This cuts costs by ~40% without quality loss.


Retry Logic with Tenacity

Azure OpenAI occasionally returns 429 (rate limit) or 503 (timeout). Without retry, these crash the user's session:

Python
from tenacity import (
    retry, stop_after_attempt, wait_exponential,
    retry_if_exception_type, before_sleep_log
)
import logging

@retry(
    retry=retry_if_exception_type((RateLimitError, APITimeoutError)),
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=8),
    before_sleep=before_sleep_log(logging.getLogger(__name__), logging.WARNING),
)
async def stream_completion(self, messages, temperature=0.1):
    ...

Behaviour: on first failure → wait 1s → retry. On second failure → wait 2s → retry. On third failure → wait 4s → retry. On fourth failure → raise to caller (handled by FastAPI exception handler as 503).


Measuring LLM Latency

Track time-to-first-token (TTFT) — the key perceived performance metric:

Python
import time
import structlog

log = structlog.get_logger()

async def stream_with_metrics(self, messages: list[dict], session_id: str):
    start = time.monotonic()
    first_token = True

    async for token in self.stream_completion(messages):
        if first_token:
            ttft = (time.monotonic() - start) * 1000
            log.info("llm.first_token", session_id=session_id, ttft_ms=round(ttft))
            first_token = False
        yield token

    total = (time.monotonic() - start) * 1000
    log.info("llm.complete", session_id=session_id, total_ms=round(total))

Typical values:

  • TTFT: 300–800ms (Azure OpenAI EU West)
  • Full response (400 tokens): 3–6 seconds

Checkpoint

Test streaming with timing:

Bash
time curl -N http://localhost:8000/api/chat \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{"message": "What is metformin used for?", "session_id": "perf-test"}'

Run it twice. The second run should be noticeably faster — that's the Redis cache serving the response. Check the structlog output for llm.first_token and llm.complete events with millisecond timings.

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.