Skill 7 — Practical LLM Integration: Streaming (SSE), Caching & Fallbacks
Wire Azure OpenAI into your FastAPI backend with production-grade patterns: Server-Sent Events streaming, prompt caching, retry on failure, and cost-aware model routing.
The Four Pillars of Production LLM Integration
- Streaming — tokens appear immediately, no waiting for full response
- Caching — identical queries hit the cache, not the LLM
- Retry with backoff — transient Azure failures don't crash the request
- Cost routing — route simple queries to cheaper models
Streaming with Server-Sent Events
SSE is a one-way stream from server to browser. The browser opens an EventSource connection, and the server pushes tokens as they arrive from Azure OpenAI.
Backend: async generator
# pharmabot/services/llm.py
from openai import AsyncAzureOpenAI, RateLimitError, APITimeoutError
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pharmabot.config import settings
import asyncio
class LLMService:
def __init__(self):
self.client = AsyncAzureOpenAI(
api_key=settings.azure_openai_api_key,
azure_endpoint=settings.azure_openai_endpoint,
api_version="2024-02-01",
timeout=30.0,
)
@retry(
retry=retry_if_exception_type((RateLimitError, APITimeoutError)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
)
async def stream_completion(
self, messages: list[dict], temperature: float = 0.1
):
async with self.client.chat.completions.stream(
model=settings.azure_openai_deployment, # gpt-4o
messages=messages,
temperature=temperature,
max_tokens=800,
stream=True,
) as stream:
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.contentFrontend: EventSource
// frontend/src/api/stream.ts
export async function streamChat(
message: string,
sessionId: string,
onToken: (token: string) => void,
onDone: () => void,
) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message, session_id: sessionId }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") { onDone(); return; }
onToken(data);
}
}
}
}Response Caching with Redis
Repeated identical queries (common for "what is ibuprofen?" type questions) should never hit the LLM:
# pharmabot/services/cache.py
import hashlib, json
from pharmabot.cache import redis_client
CACHE_TTL = 3600 # 1 hour
def make_cache_key(messages: list[dict]) -> str:
payload = json.dumps(messages, sort_keys=True)
return f"llm:response:{hashlib.sha256(payload.encode()).hexdigest()}"
async def get_cached_response(messages: list[dict]) -> str | None:
key = make_cache_key(messages)
return await redis_client.get(key)
async def cache_response(messages: list[dict], response: str) -> None:
key = make_cache_key(messages)
await redis_client.setex(key, CACHE_TTL, response)Wrap the LLM service to check the cache first:
async def stream_with_cache(self, messages: list[dict]):
cached = await get_cached_response(messages)
if cached:
# Replay cached response token by token (maintains streaming UX)
for word in cached.split(" "):
yield word + " "
await asyncio.sleep(0.01) # simulate streaming pace
return
full_response = ""
async for token in self.stream_completion(messages):
full_response += token
yield token
await cache_response(messages, full_response)Cost-Aware Model Routing
Not every query needs GPT-4o. Route simple classification tasks to gpt-4o-mini:
# pharmabot/services/router.py
def select_model(task: str) -> str:
"""
Route to cheaper model for simple tasks.
GPT-4o-mini costs ~15x less than GPT-4o.
"""
simple_tasks = {"classify", "triage", "extract_drug_names"}
if task in simple_tasks:
return "gpt-4o-mini"
return "gpt-4o" # full model for drug info and interaction analysisThe Triage Agent (which only classifies intent) uses gpt-4o-mini. The Drug Info and Interaction Checker agents use gpt-4o for higher accuracy. This cuts costs by ~40% without quality loss.
Retry Logic with Tenacity
Azure OpenAI occasionally returns 429 (rate limit) or 503 (timeout). Without retry, these crash the user's session:
from tenacity import (
retry, stop_after_attempt, wait_exponential,
retry_if_exception_type, before_sleep_log
)
import logging
@retry(
retry=retry_if_exception_type((RateLimitError, APITimeoutError)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
before_sleep=before_sleep_log(logging.getLogger(__name__), logging.WARNING),
)
async def stream_completion(self, messages, temperature=0.1):
...Behaviour: on first failure → wait 1s → retry. On second failure → wait 2s → retry. On third failure → wait 4s → retry. On fourth failure → raise to caller (handled by FastAPI exception handler as 503).
Measuring LLM Latency
Track time-to-first-token (TTFT) — the key perceived performance metric:
import time
import structlog
log = structlog.get_logger()
async def stream_with_metrics(self, messages: list[dict], session_id: str):
start = time.monotonic()
first_token = True
async for token in self.stream_completion(messages):
if first_token:
ttft = (time.monotonic() - start) * 1000
log.info("llm.first_token", session_id=session_id, ttft_ms=round(ttft))
first_token = False
yield token
total = (time.monotonic() - start) * 1000
log.info("llm.complete", session_id=session_id, total_ms=round(total))Typical values:
- TTFT: 300–800ms (Azure OpenAI EU West)
- Full response (400 tokens): 3–6 seconds
Checkpoint
Test streaming with timing:
time curl -N http://localhost:8000/api/chat \
-X POST \
-H "Content-Type: application/json" \
-d '{"message": "What is metformin used for?", "session_id": "perf-test"}'Run it twice. The second run should be noticeably faster — that's the Redis cache serving the response. Check the structlog output for llm.first_token and llm.complete events with millisecond timings.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.