Learnixo
Back to blog
AI Systemsintermediate

Parse Streaming LLM Output

Implement a streaming parser for Server-Sent Events from OpenAI and Anthropic APIs. Handle partial JSON, tool call streaming, and real-time display.

Asma Hafeez KhanMay 16, 20265 min read
Live CodingStreamingSSEPythonFastAPI
Share:𝕏

Why Streaming?

Non-streaming LLM calls make users wait for the entire response before seeing anything. Streaming sends tokens as they're generated — users see output in real time, reducing perceived latency dramatically.

For a 1,000-token response at 50 tokens/second: streaming shows the first token in ~20ms. Non-streaming shows nothing for 20 seconds.


OpenAI Streaming

Python
from openai import OpenAI

client = OpenAI()

def stream_response(prompt: str) -> str:
    """Stream tokens and print in real time."""
    full_response = []

    with client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    ) as stream:
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                token = delta.content
                print(token, end="", flush=True)  # Real-time display
                full_response.append(token)

    print()  # New line after complete
    return "".join(full_response)

result = stream_response("Explain warfarin's mechanism of action in 3 sentences.")

SSE Parser from Scratch

OpenAI's streaming uses Server-Sent Events (SSE). Each event is:

data: {"id": "chatcmpl-...", "choices": [{"delta": {"content": "token"}}]}

data: [DONE]

Parse this manually for custom HTTP streaming:

Python
import json
from typing import Iterator

def parse_sse_stream(response_body: bytes) -> Iterator[str]:
    """
    Parse Server-Sent Events stream.
    Yields content tokens as they arrive.
    """
    buffer = ""

    for line in response_body.decode("utf-8").splitlines():
        line = line.strip()

        if line.startswith("data: "):
            data_str = line[6:]  # Remove "data: " prefix

            if data_str == "[DONE]":
                break

            try:
                data = json.loads(data_str)
                choices = data.get("choices", [])
                if choices:
                    delta = choices[0].get("delta", {})
                    content = delta.get("content")
                    if content:
                        yield content
            except json.JSONDecodeError:
                continue  # Skip malformed chunks

# Simulate SSE response
fake_sse = b"""data: {"choices": [{"delta": {"content": "Warfarin"}}]}

data: {"choices": [{"delta": {"content": " inhibits"}}]}

data: {"choices": [{"delta": {"content": " VKOR"}}]}

data: [DONE]
"""

tokens = list(parse_sse_stream(fake_sse))
print("".join(tokens))  # "Warfarin inhibits VKOR"

FastAPI Streaming Endpoint

Stream LLM responses from a FastAPI endpoint to a browser:

Python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import asyncio

app = FastAPI()
aclient = AsyncOpenAI()

async def stream_llm(prompt: str):
    """Async generator yielding SSE-formatted chunks."""
    async with aclient.chat.completions.stream(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            # Format as SSE
            yield f"data: {json.dumps({'content': text})}\n\n"

    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(request: dict):
    prompt = request.get("prompt", "")
    return StreamingResponse(
        stream_llm(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Disable Nginx buffering
        },
    )

Collecting Streaming Response for Processing

Sometimes you need to stream to the user AND capture the complete response for processing:

Python
import asyncio
from openai import AsyncOpenAI

aclient = AsyncOpenAI()

async def stream_and_collect(
    prompt: str,
    on_token=None,  # Callback for each token
) -> str:
    """Stream response, call callback for each token, return full response."""
    full_tokens = []

    async with aclient.chat.completions.stream(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            full_tokens.append(text)
            if on_token:
                await on_token(text)

    return "".join(full_tokens)

# Usage: stream to WebSocket and collect
async def handle_websocket_chat(websocket, prompt: str):
    async def send_token(token: str):
        await websocket.send_text(json.dumps({"token": token}))

    full_response = await stream_and_collect(prompt, on_token=send_token)

    # Process complete response
    await websocket.send_text(json.dumps({
        "done": True,
        "full_response": full_response,
        "word_count": len(full_response.split()),
    }))

Streaming with Tool Calls

Tool calls arrive in chunks too — the JSON is split across multiple tokens:

Python
import json

class ToolCallAccumulator:
    """Accumulates streaming tool call fragments into complete calls."""

    def __init__(self):
        self.tool_calls: dict[int, dict] = {}  # index  tool call data

    def process_chunk(self, delta) -> list[dict] | None:
        """Process a streaming delta. Returns complete tool calls when ready."""
        if not hasattr(delta, "tool_calls") or not delta.tool_calls:
            return None

        for tc_delta in delta.tool_calls:
            idx = tc_delta.index
            if idx not in self.tool_calls:
                self.tool_calls[idx] = {"id": "", "name": "", "args_json": ""}

            tc = self.tool_calls[idx]
            if tc_delta.id:
                tc["id"] += tc_delta.id
            if tc_delta.function:
                if tc_delta.function.name:
                    tc["name"] += tc_delta.function.name
                if tc_delta.function.arguments:
                    tc["args_json"] += tc_delta.function.arguments

        return None  # Still accumulating

    def finalize(self) -> list[dict]:
        """Call when stream is complete. Returns parsed tool calls."""
        result = []
        for tc in self.tool_calls.values():
            try:
                args = json.loads(tc["args_json"])
            except json.JSONDecodeError:
                args = {}
            result.append({"id": tc["id"], "name": tc["name"], "args": args})
        return result

# Usage in streaming loop
accumulator = ToolCallAccumulator()

async def stream_with_tools(messages, tools):
    async with aclient.chat.completions.stream(
        model="gpt-4o",
        messages=messages,
        tools=tools,
        stream=True,
    ) as stream:
        async for chunk in stream:
            delta = chunk.choices[0].delta

            # Stream text content
            if delta.content:
                print(delta.content, end="", flush=True)

            # Accumulate tool calls
            accumulator.process_chunk(delta)

    # Get complete tool calls after stream ends
    tool_calls = accumulator.finalize()
    return tool_calls

Interview Questions

Q: What is Server-Sent Events (SSE) and how does it differ from WebSockets?

SSE: unidirectional, server pushes events to the client over a standard HTTP connection. Client can't send back. Format: data: ...\n\n. Native browser support via EventSource. WebSockets: bidirectional, full-duplex, requires a handshake and persistent connection. SSE is simpler for streaming responses where only the server needs to push. WebSockets are needed for interactive real-time features (chat where user can interrupt).

Q: How do you handle backpressure when the client is slower than the server?

The HTTP streaming layer handles backpressure automatically — if the client's TCP buffer fills, the server's write call blocks. In Python, this means the async generator pauses at yield. Explicit backpressure: use an asyncio.Queue between producer and consumer, with a max size. If the queue is full, the producer waits.

Q: What happens if the user closes the connection mid-stream?

In FastAPI/Starlette, the StreamingResponse generator raises an exception (usually ConnectionError or asyncio.CancelledError) when the client disconnects. Always wrap your streaming generator in try/finally to clean up resources (cancel the LLM request, close connections).

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.