Live Coding Interview Prep · Lesson 15 of 16
Parse SSE Stream Tokens in Real-Time
Why Streaming?
Non-streaming LLM calls make users wait for the entire response before seeing anything. Streaming sends tokens as they're generated — users see output in real time, reducing perceived latency dramatically.
For a 1,000-token response at 50 tokens/second: streaming shows the first token in ~20ms. Non-streaming shows nothing for 20 seconds.
OpenAI Streaming
from openai import OpenAI
client = OpenAI()
def stream_response(prompt: str) -> str:
"""Stream tokens and print in real time."""
full_response = []
with client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
) as stream:
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
token = delta.content
print(token, end="", flush=True) # Real-time display
full_response.append(token)
print() # New line after complete
return "".join(full_response)
result = stream_response("Explain warfarin's mechanism of action in 3 sentences.")SSE Parser from Scratch
OpenAI's streaming uses Server-Sent Events (SSE). Each event is:
data: {"id": "chatcmpl-...", "choices": [{"delta": {"content": "token"}}]}
data: [DONE]Parse this manually for custom HTTP streaming:
import json
from typing import Iterator
def parse_sse_stream(response_body: bytes) -> Iterator[str]:
"""
Parse Server-Sent Events stream.
Yields content tokens as they arrive.
"""
buffer = ""
for line in response_body.decode("utf-8").splitlines():
line = line.strip()
if line.startswith("data: "):
data_str = line[6:] # Remove "data: " prefix
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content")
if content:
yield content
except json.JSONDecodeError:
continue # Skip malformed chunks
# Simulate SSE response
fake_sse = b"""data: {"choices": [{"delta": {"content": "Warfarin"}}]}
data: {"choices": [{"delta": {"content": " inhibits"}}]}
data: {"choices": [{"delta": {"content": " VKOR"}}]}
data: [DONE]
"""
tokens = list(parse_sse_stream(fake_sse))
print("".join(tokens)) # "Warfarin inhibits VKOR"FastAPI Streaming Endpoint
Stream LLM responses from a FastAPI endpoint to a browser:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import asyncio
app = FastAPI()
aclient = AsyncOpenAI()
async def stream_llm(prompt: str):
"""Async generator yielding SSE-formatted chunks."""
async with aclient.chat.completions.stream(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
# Format as SSE
yield f"data: {json.dumps({'content': text})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(request: dict):
prompt = request.get("prompt", "")
return StreamingResponse(
stream_llm(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable Nginx buffering
},
)Collecting Streaming Response for Processing
Sometimes you need to stream to the user AND capture the complete response for processing:
import asyncio
from openai import AsyncOpenAI
aclient = AsyncOpenAI()
async def stream_and_collect(
prompt: str,
on_token=None, # Callback for each token
) -> str:
"""Stream response, call callback for each token, return full response."""
full_tokens = []
async with aclient.chat.completions.stream(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
full_tokens.append(text)
if on_token:
await on_token(text)
return "".join(full_tokens)
# Usage: stream to WebSocket and collect
async def handle_websocket_chat(websocket, prompt: str):
async def send_token(token: str):
await websocket.send_text(json.dumps({"token": token}))
full_response = await stream_and_collect(prompt, on_token=send_token)
# Process complete response
await websocket.send_text(json.dumps({
"done": True,
"full_response": full_response,
"word_count": len(full_response.split()),
}))Streaming with Tool Calls
Tool calls arrive in chunks too — the JSON is split across multiple tokens:
import json
class ToolCallAccumulator:
"""Accumulates streaming tool call fragments into complete calls."""
def __init__(self):
self.tool_calls: dict[int, dict] = {} # index → tool call data
def process_chunk(self, delta) -> list[dict] | None:
"""Process a streaming delta. Returns complete tool calls when ready."""
if not hasattr(delta, "tool_calls") or not delta.tool_calls:
return None
for tc_delta in delta.tool_calls:
idx = tc_delta.index
if idx not in self.tool_calls:
self.tool_calls[idx] = {"id": "", "name": "", "args_json": ""}
tc = self.tool_calls[idx]
if tc_delta.id:
tc["id"] += tc_delta.id
if tc_delta.function:
if tc_delta.function.name:
tc["name"] += tc_delta.function.name
if tc_delta.function.arguments:
tc["args_json"] += tc_delta.function.arguments
return None # Still accumulating
def finalize(self) -> list[dict]:
"""Call when stream is complete. Returns parsed tool calls."""
result = []
for tc in self.tool_calls.values():
try:
args = json.loads(tc["args_json"])
except json.JSONDecodeError:
args = {}
result.append({"id": tc["id"], "name": tc["name"], "args": args})
return result
# Usage in streaming loop
accumulator = ToolCallAccumulator()
async def stream_with_tools(messages, tools):
async with aclient.chat.completions.stream(
model="gpt-4o",
messages=messages,
tools=tools,
stream=True,
) as stream:
async for chunk in stream:
delta = chunk.choices[0].delta
# Stream text content
if delta.content:
print(delta.content, end="", flush=True)
# Accumulate tool calls
accumulator.process_chunk(delta)
# Get complete tool calls after stream ends
tool_calls = accumulator.finalize()
return tool_callsInterview Questions
Q: What is Server-Sent Events (SSE) and how does it differ from WebSockets?
SSE: unidirectional, server pushes events to the client over a standard HTTP connection. Client can't send back. Format: data: ...\n\n. Native browser support via EventSource. WebSockets: bidirectional, full-duplex, requires a handshake and persistent connection. SSE is simpler for streaming responses where only the server needs to push. WebSockets are needed for interactive real-time features (chat where user can interrupt).
Q: How do you handle backpressure when the client is slower than the server?
The HTTP streaming layer handles backpressure automatically — if the client's TCP buffer fills, the server's write call blocks. In Python, this means the async generator pauses at yield. Explicit backpressure: use an asyncio.Queue between producer and consumer, with a max size. If the queue is full, the producer waits.
Q: What happens if the user closes the connection mid-stream?
In FastAPI/Starlette, the StreamingResponse generator raises an exception (usually ConnectionError or asyncio.CancelledError) when the client disconnects. Always wrap your streaming generator in try/finally to clean up resources (cancel the LLM request, close connections).