FastAPI for AI Engineers · Lesson 5 of 12
SSE Streaming: Stream LLM Tokens to the Browser
What Is Server-Sent Events?
Server-Sent Events (SSE) is a standard HTTP mechanism for pushing data from server to client over a single long-lived connection. The server sends a series of events; the client receives them incrementally.
Unlike WebSockets, SSE is:
- Unidirectional — server sends, client only listens
- HTTP-native — it uses a plain
GETorPOSTrequest withContent-Type: text/event-stream - Reconnect-aware — browsers retry automatically on disconnect
- Firewall-friendly — it's just HTTP, not a protocol upgrade
For LLM applications, SSE is ideal: the model generates tokens sequentially and you want to ship each token to the browser as soon as it arrives, rather than waiting for the entire completion.
The SSE Wire Format
SSE messages are plain text lines separated by \n\n:
data: Hello\n\n
data: world\n\n
data: [DONE]\n\nEach event can carry multiple fields:
id: 42
event: token
data: {"text": "Hello", "index": 0}
id: 43
event: token
data: {"text": " world", "index": 1}
event: done
data: [DONE]For LLM streaming, the simplest approach is to send only data: lines with JSON payloads, and use data: [DONE] as the terminator — the same convention OpenAI uses.
StreamingResponse + AsyncGenerator
FastAPI's StreamingResponse takes an async generator and writes its yielded bytes directly to the HTTP connection, flushing after each chunk.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
app = FastAPI()
async def count_up() -> AsyncGenerator[bytes, None]:
for i in range(5):
yield f"data: {i}\n\n".encode()
@app.get("/stream/count")
async def stream_count() -> StreamingResponse:
return StreamingResponse(
count_up(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx buffering
"Connection": "keep-alive",
},
)Test it in the terminal:
curl -N http://localhost:8000/stream/count
# data: 0
# data: 1
# data: 2
# data: 3
# data: 4The -N flag disables curl's output buffering so you see events as they arrive.
Streaming Azure OpenAI Completions
The OpenAI Python SDK exposes stream=True which returns an async iterator of ChatCompletionChunk objects. Each chunk contains one or more token deltas.
# routers/stream.py
import json
import os
from typing import AsyncGenerator
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from openai import AsyncAzureOpenAI
from pydantic import BaseModel, Field
router = APIRouter(prefix="/chat", tags=["streaming"])
client = AsyncAzureOpenAI(
api_key=os.environ.get("AZURE_OPENAI_API_KEY", ""),
azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT", ""),
api_version="2024-12-01-preview",
)
class StreamRequest(BaseModel):
messages: list[dict] = Field(..., min_length=1)
deployment: str = "gpt-4o"
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
max_tokens: int = Field(default=1024, ge=1, le=4096)
async def token_generator(req: StreamRequest) -> AsyncGenerator[bytes, None]:
"""
Yield SSE-formatted bytes for each token delta from Azure OpenAI.
Send data: [DONE] when the stream ends.
"""
try:
async with client.chat.completions.stream(
model=req.deployment,
messages=req.messages,
temperature=req.temperature,
max_tokens=req.max_tokens,
) as stream:
async for chunk in stream:
# Each chunk may have zero or more choices
for choice in chunk.choices:
delta = choice.delta
if delta.content:
payload = json.dumps({"token": delta.content})
yield f"data: {payload}\n\n".encode()
if choice.finish_reason:
finish_payload = json.dumps({"finish_reason": choice.finish_reason})
yield f"data: {finish_payload}\n\n".encode()
yield b"data: [DONE]\n\n"
except Exception as exc:
error_payload = json.dumps({"error": str(exc)})
yield f"data: {error_payload}\n\n".encode()
yield b"data: [DONE]\n\n"
@router.post("/stream")
async def stream_chat(req: StreamRequest) -> StreamingResponse:
"""
Stream chat completion tokens as Server-Sent Events.
Connect with EventSource or fetch + ReadableStream on the client.
"""
return StreamingResponse(
token_generator(req),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)Frontend Consumption
Option 1: EventSource API
EventSource is the browser's built-in SSE client. It only supports GET requests, so you'd pass the prompt as a query parameter — which limits length. For production use Option 2 (fetch).
<!DOCTYPE html>
<html>
<body>
<div id="output"></div>
<script>
const output = document.getElementById("output");
const source = new EventSource("/chat/stream?prompt=Hello");
source.onmessage = (event) => {
if (event.data === "[DONE]") {
source.close();
return;
}
try {
const parsed = JSON.parse(event.data);
if (parsed.token) {
output.textContent += parsed.token;
}
} catch {
console.warn("Non-JSON SSE data:", event.data);
}
};
source.onerror = () => {
console.error("SSE connection error");
source.close();
};
</script>
</body>
</html>Option 2: fetch with ReadableStream (Recommended)
fetch with streaming supports POST and full request bodies — the right choice for production AI chat interfaces.
async function streamChat(messages) {
const output = document.getElementById("output");
output.textContent = "";
const response = await fetch("/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages, deployment: "gpt-4o" }),
});
if (!response.ok) {
throw new Error(`HTTP error: ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
// A single chunk may contain multiple SSE events
const text = decoder.decode(value, { stream: true });
const lines = text.split("\n\n").filter(Boolean);
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") return;
try {
const parsed = JSON.parse(data);
if (parsed.token) {
output.textContent += parsed.token;
}
if (parsed.error) {
console.error("Server error during stream:", parsed.error);
return;
}
} catch {
// Non-JSON line — ignore or log
}
}
}
}
// Usage
streamChat([
{ role: "user", content: "Explain ibuprofen in simple terms." }
]);React hook for streaming
// useStreamingChat.ts
import { useState, useCallback } from "react";
interface Message {
role: "user" | "assistant" | "system";
content: string;
}
export function useStreamingChat() {
const [response, setResponse] = useState("");
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const sendMessage = useCallback(async (messages: Message[]) => {
setLoading(true);
setError(null);
setResponse("");
try {
const res = await fetch("/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
if (!res.ok) throw new Error(`HTTP ${res.status}`);
if (!res.body) throw new Error("No response body");
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
for (const line of text.split("\n\n").filter(Boolean)) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") { reader.cancel(); return; }
const parsed = JSON.parse(data);
if (parsed.token) setResponse((prev) => prev + parsed.token);
if (parsed.error) throw new Error(parsed.error);
}
}
} catch (err) {
setError(err instanceof Error ? err.message : "Unknown error");
} finally {
setLoading(false);
}
}, []);
return { response, loading, error, sendMessage };
}Error Handling Mid-Stream
Once you start sending a StreamingResponse, the HTTP status code (200) is already committed. You cannot change it to 500 mid-stream. The correct pattern is to:
- Catch exceptions inside the generator
- Yield an error event
- Yield
[DONE]to signal the end of stream - Let the client parse the error event and display a message
async def token_generator_with_retry(req: StreamRequest) -> AsyncGenerator[bytes, None]:
max_attempts = 2
attempt = 0
while attempt < max_attempts:
try:
async with client.chat.completions.stream(
model=req.deployment,
messages=req.messages,
temperature=req.temperature,
max_tokens=req.max_tokens,
) as stream:
async for chunk in stream:
for choice in chunk.choices:
if choice.delta.content:
payload = json.dumps({"token": choice.delta.content})
yield f"data: {payload}\n\n".encode()
break # Success — exit retry loop
except Exception as exc:
attempt += 1
if attempt >= max_attempts:
error_payload = json.dumps({
"error": f"Stream failed after {max_attempts} attempts: {exc}"
})
yield f"data: {error_payload}\n\n".encode()
else:
# Brief pause before retry
import asyncio
await asyncio.sleep(0.5)
yield b"data: [DONE]\n\n"Keepalive Comments
SSE connections may time out behind proxies or load balancers if no data is sent for a while. Send periodic SSE comment lines (lines starting with :) to keep the connection alive — they are ignored by the client:
import asyncio
async def token_generator_with_keepalive(req: StreamRequest) -> AsyncGenerator[bytes, None]:
keepalive_interval = 15 # seconds
async def generate():
async with client.chat.completions.stream(
model=req.deployment,
messages=req.messages,
) as stream:
async for chunk in stream:
for choice in chunk.choices:
if choice.delta.content:
payload = json.dumps({"token": choice.delta.content})
yield f"data: {payload}\n\n".encode()
yield b"data: [DONE]\n\n"
gen = generate()
last_sent = asyncio.get_event_loop().time()
async for chunk in gen:
now = asyncio.get_event_loop().time()
if now - last_sent > keepalive_interval:
yield b": keepalive\n\n"
yield chunk
last_sent = asyncio.get_event_loop().time()Complete Streaming Endpoint
Putting it all together — a production-ready streaming endpoint:
# routers/stream_complete.py
import json
import os
import asyncio
from typing import AsyncGenerator
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from openai import AsyncAzureOpenAI
from pydantic import BaseModel, Field
router = APIRouter(prefix="/stream", tags=["stream"])
_client: AsyncAzureOpenAI | None = None
def get_client() -> AsyncAzureOpenAI:
global _client
if _client is None:
_client = AsyncAzureOpenAI(
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version="2024-12-01-preview",
)
return _client
class StreamChatRequest(BaseModel):
messages: list[dict] = Field(..., min_length=1)
deployment: str = Field(default="gpt-4o")
max_tokens: int = Field(default=1024, ge=1, le=4096)
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
async def chat_token_stream(
request: Request,
req: StreamChatRequest,
) -> AsyncGenerator[bytes, None]:
client = get_client()
tokens_sent = 0
try:
async with client.chat.completions.stream(
model=req.deployment,
messages=req.messages,
temperature=req.temperature,
max_tokens=req.max_tokens,
) as stream:
async for chunk in stream:
# Honour client disconnect
if await request.is_disconnected():
break
for choice in chunk.choices:
delta = choice.delta
if delta.content:
tokens_sent += 1
payload = json.dumps({
"token": delta.content,
"index": tokens_sent,
})
yield f"data: {payload}\n\n".encode()
except asyncio.CancelledError:
# Client disconnected — clean exit
return
except Exception as exc:
yield f"data: {json.dumps({'error': str(exc)})}\n\n".encode()
finally:
yield b"data: [DONE]\n\n"
@router.post("/chat")
async def stream_chat(
req: StreamChatRequest,
request: Request,
) -> StreamingResponse:
return StreamingResponse(
chat_token_stream(request, req),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
"Transfer-Encoding": "chunked",
},
)Key Takeaways
- SSE streams data over a plain HTTP connection using
text/event-streamcontent type anddata: ...\n\nframing StreamingResponse+ async generator is the FastAPI pattern — yield encoded bytes for each event- The OpenAI SDK's
.stream()context manager yieldsChatCompletionChunkobjects with delta content fetchwithReadableStreamis the recommended client approach — it supportsPOSTand full JSON bodies unlikeEventSource- Once streaming starts, the HTTP 200 status is committed — communicate errors via JSON error events in the stream body
- Set
X-Accel-Buffering: noto prevent nginx from buffering the stream;Cache-Control: no-cacheprevents proxy caching - Check
await request.is_disconnected()inside the generator to stop sending when the client drops the connection
Next lesson: Background Tasks — offloading work like audit logging after the response is sent.