Learnixo
Back to blog
AI Systemsintermediate

Server-Sent Events for LLM Streaming

Stream LLM tokens to the browser using FastAPI's StreamingResponse and Python AsyncGenerator. Covers SSE format, Azure OpenAI streaming, JavaScript consumption, and mid-stream error handling.

Asma Hafeez KhanMay 15, 20269 min read
fastapistreamingsseopenaijavascript
Share:š•

What Is Server-Sent Events?

Server-Sent Events (SSE) is a standard HTTP mechanism for pushing data from server to client over a single long-lived connection. The server sends a series of events; the client receives them incrementally.

Unlike WebSockets, SSE is:

  • Unidirectional — server sends, client only listens
  • HTTP-native — it uses a plain GET or POST request with Content-Type: text/event-stream
  • Reconnect-aware — browsers retry automatically on disconnect
  • Firewall-friendly — it's just HTTP, not a protocol upgrade

For LLM applications, SSE is ideal: the model generates tokens sequentially and you want to ship each token to the browser as soon as it arrives, rather than waiting for the entire completion.

The SSE Wire Format

SSE messages are plain text lines separated by \n\n:

data: Hello\n\n
data: world\n\n
data: [DONE]\n\n

Each event can carry multiple fields:

id: 42
event: token
data: {"text": "Hello", "index": 0}

id: 43
event: token
data: {"text": " world", "index": 1}

event: done
data: [DONE]

For LLM streaming, the simplest approach is to send only data: lines with JSON payloads, and use data: [DONE] as the terminator — the same convention OpenAI uses.

StreamingResponse + AsyncGenerator

FastAPI's StreamingResponse takes an async generator and writes its yielded bytes directly to the HTTP connection, flushing after each chunk.

Python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator

app = FastAPI()

async def count_up() -> AsyncGenerator[bytes, None]:
    for i in range(5):
        yield f"data: {i}\n\n".encode()

@app.get("/stream/count")
async def stream_count() -> StreamingResponse:
    return StreamingResponse(
        count_up(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",   # Disable nginx buffering
            "Connection": "keep-alive",
        },
    )

Test it in the terminal:

Bash
curl -N http://localhost:8000/stream/count
# data: 0
# data: 1
# data: 2
# data: 3
# data: 4

The -N flag disables curl's output buffering so you see events as they arrive.

Streaming Azure OpenAI Completions

The OpenAI Python SDK exposes stream=True which returns an async iterator of ChatCompletionChunk objects. Each chunk contains one or more token deltas.

Python
# routers/stream.py
import json
import os
from typing import AsyncGenerator

from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from openai import AsyncAzureOpenAI
from pydantic import BaseModel, Field

router = APIRouter(prefix="/chat", tags=["streaming"])

client = AsyncAzureOpenAI(
    api_key=os.environ.get("AZURE_OPENAI_API_KEY", ""),
    azure_endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT", ""),
    api_version="2024-12-01-preview",
)


class StreamRequest(BaseModel):
    messages: list[dict] = Field(..., min_length=1)
    deployment: str = "gpt-4o"
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    max_tokens: int = Field(default=1024, ge=1, le=4096)


async def token_generator(req: StreamRequest) -> AsyncGenerator[bytes, None]:
    """
    Yield SSE-formatted bytes for each token delta from Azure OpenAI.
    Send data: [DONE] when the stream ends.
    """
    try:
        async with client.chat.completions.stream(
            model=req.deployment,
            messages=req.messages,
            temperature=req.temperature,
            max_tokens=req.max_tokens,
        ) as stream:
            async for chunk in stream:
                # Each chunk may have zero or more choices
                for choice in chunk.choices:
                    delta = choice.delta
                    if delta.content:
                        payload = json.dumps({"token": delta.content})
                        yield f"data: {payload}\n\n".encode()

                    if choice.finish_reason:
                        finish_payload = json.dumps({"finish_reason": choice.finish_reason})
                        yield f"data: {finish_payload}\n\n".encode()

        yield b"data: [DONE]\n\n"

    except Exception as exc:
        error_payload = json.dumps({"error": str(exc)})
        yield f"data: {error_payload}\n\n".encode()
        yield b"data: [DONE]\n\n"


@router.post("/stream")
async def stream_chat(req: StreamRequest) -> StreamingResponse:
    """
    Stream chat completion tokens as Server-Sent Events.
    Connect with EventSource or fetch + ReadableStream on the client.
    """
    return StreamingResponse(
        token_generator(req),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
            "Connection": "keep-alive",
        },
    )

Frontend Consumption

Option 1: EventSource API

EventSource is the browser's built-in SSE client. It only supports GET requests, so you'd pass the prompt as a query parameter — which limits length. For production use Option 2 (fetch).

HTML
<!DOCTYPE html>
<html>
<body>
  <div id="output"></div>
  <script>
    const output = document.getElementById("output");
    const source = new EventSource("/chat/stream?prompt=Hello");

    source.onmessage = (event) => {
      if (event.data === "[DONE]") {
        source.close();
        return;
      }
      try {
        const parsed = JSON.parse(event.data);
        if (parsed.token) {
          output.textContent += parsed.token;
        }
      } catch {
        console.warn("Non-JSON SSE data:", event.data);
      }
    };

    source.onerror = () => {
      console.error("SSE connection error");
      source.close();
    };
  </script>
</body>
</html>

Option 2: fetch with ReadableStream (Recommended)

fetch with streaming supports POST and full request bodies — the right choice for production AI chat interfaces.

JAVASCRIPT
async function streamChat(messages) {
  const output = document.getElementById("output");
  output.textContent = "";

  const response = await fetch("/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages, deployment: "gpt-4o" }),
  });

  if (!response.ok) {
    throw new Error(`HTTP error: ${response.status}`);
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    // A single chunk may contain multiple SSE events
    const text = decoder.decode(value, { stream: true });
    const lines = text.split("\n\n").filter(Boolean);

    for (const line of lines) {
      if (!line.startsWith("data: ")) continue;
      const data = line.slice(6).trim();

      if (data === "[DONE]") return;

      try {
        const parsed = JSON.parse(data);
        if (parsed.token) {
          output.textContent += parsed.token;
        }
        if (parsed.error) {
          console.error("Server error during stream:", parsed.error);
          return;
        }
      } catch {
        // Non-JSON line — ignore or log
      }
    }
  }
}

// Usage
streamChat([
  { role: "user", content: "Explain ibuprofen in simple terms." }
]);

React hook for streaming

TYPESCRIPT
// useStreamingChat.ts
import { useState, useCallback } from "react";

interface Message {
  role: "user" | "assistant" | "system";
  content: string;
}

export function useStreamingChat() {
  const [response, setResponse] = useState("");
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState<string | null>(null);

  const sendMessage = useCallback(async (messages: Message[]) => {
    setLoading(true);
    setError(null);
    setResponse("");

    try {
      const res = await fetch("/chat/stream", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ messages }),
      });

      if (!res.ok) throw new Error(`HTTP ${res.status}`);
      if (!res.body) throw new Error("No response body");

      const reader = res.body.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const text = decoder.decode(value, { stream: true });
        for (const line of text.split("\n\n").filter(Boolean)) {
          if (!line.startsWith("data: ")) continue;
          const data = line.slice(6).trim();
          if (data === "[DONE]") { reader.cancel(); return; }

          const parsed = JSON.parse(data);
          if (parsed.token) setResponse((prev) => prev + parsed.token);
          if (parsed.error) throw new Error(parsed.error);
        }
      }
    } catch (err) {
      setError(err instanceof Error ? err.message : "Unknown error");
    } finally {
      setLoading(false);
    }
  }, []);

  return { response, loading, error, sendMessage };
}

Error Handling Mid-Stream

Once you start sending a StreamingResponse, the HTTP status code (200) is already committed. You cannot change it to 500 mid-stream. The correct pattern is to:

  1. Catch exceptions inside the generator
  2. Yield an error event
  3. Yield [DONE] to signal the end of stream
  4. Let the client parse the error event and display a message
Python
async def token_generator_with_retry(req: StreamRequest) -> AsyncGenerator[bytes, None]:
    max_attempts = 2
    attempt = 0

    while attempt < max_attempts:
        try:
            async with client.chat.completions.stream(
                model=req.deployment,
                messages=req.messages,
                temperature=req.temperature,
                max_tokens=req.max_tokens,
            ) as stream:
                async for chunk in stream:
                    for choice in chunk.choices:
                        if choice.delta.content:
                            payload = json.dumps({"token": choice.delta.content})
                            yield f"data: {payload}\n\n".encode()
            break   # Success — exit retry loop

        except Exception as exc:
            attempt += 1
            if attempt >= max_attempts:
                error_payload = json.dumps({
                    "error": f"Stream failed after {max_attempts} attempts: {exc}"
                })
                yield f"data: {error_payload}\n\n".encode()
            else:
                # Brief pause before retry
                import asyncio
                await asyncio.sleep(0.5)

    yield b"data: [DONE]\n\n"

Keepalive Comments

SSE connections may time out behind proxies or load balancers if no data is sent for a while. Send periodic SSE comment lines (lines starting with :) to keep the connection alive — they are ignored by the client:

Python
import asyncio

async def token_generator_with_keepalive(req: StreamRequest) -> AsyncGenerator[bytes, None]:
    keepalive_interval = 15  # seconds

    async def generate():
        async with client.chat.completions.stream(
            model=req.deployment,
            messages=req.messages,
        ) as stream:
            async for chunk in stream:
                for choice in chunk.choices:
                    if choice.delta.content:
                        payload = json.dumps({"token": choice.delta.content})
                        yield f"data: {payload}\n\n".encode()
        yield b"data: [DONE]\n\n"

    gen = generate()
    last_sent = asyncio.get_event_loop().time()

    async for chunk in gen:
        now = asyncio.get_event_loop().time()
        if now - last_sent > keepalive_interval:
            yield b": keepalive\n\n"
        yield chunk
        last_sent = asyncio.get_event_loop().time()

Complete Streaming Endpoint

Putting it all together — a production-ready streaming endpoint:

Python
# routers/stream_complete.py
import json
import os
import asyncio
from typing import AsyncGenerator
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from openai import AsyncAzureOpenAI
from pydantic import BaseModel, Field

router = APIRouter(prefix="/stream", tags=["stream"])

_client: AsyncAzureOpenAI | None = None

def get_client() -> AsyncAzureOpenAI:
    global _client
    if _client is None:
        _client = AsyncAzureOpenAI(
            api_key=os.environ["AZURE_OPENAI_API_KEY"],
            azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
            api_version="2024-12-01-preview",
        )
    return _client


class StreamChatRequest(BaseModel):
    messages: list[dict] = Field(..., min_length=1)
    deployment: str = Field(default="gpt-4o")
    max_tokens: int = Field(default=1024, ge=1, le=4096)
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)


async def chat_token_stream(
    request: Request,
    req: StreamChatRequest,
) -> AsyncGenerator[bytes, None]:
    client = get_client()
    tokens_sent = 0

    try:
        async with client.chat.completions.stream(
            model=req.deployment,
            messages=req.messages,
            temperature=req.temperature,
            max_tokens=req.max_tokens,
        ) as stream:
            async for chunk in stream:
                # Honour client disconnect
                if await request.is_disconnected():
                    break

                for choice in chunk.choices:
                    delta = choice.delta
                    if delta.content:
                        tokens_sent += 1
                        payload = json.dumps({
                            "token": delta.content,
                            "index": tokens_sent,
                        })
                        yield f"data: {payload}\n\n".encode()

    except asyncio.CancelledError:
        # Client disconnected — clean exit
        return
    except Exception as exc:
        yield f"data: {json.dumps({'error': str(exc)})}\n\n".encode()
    finally:
        yield b"data: [DONE]\n\n"


@router.post("/chat")
async def stream_chat(
    req: StreamChatRequest,
    request: Request,
) -> StreamingResponse:
    return StreamingResponse(
        chat_token_stream(request, req),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache, no-transform",
            "X-Accel-Buffering": "no",
            "Connection": "keep-alive",
            "Transfer-Encoding": "chunked",
        },
    )

Key Takeaways

  • SSE streams data over a plain HTTP connection using text/event-stream content type and data: ...\n\n framing
  • StreamingResponse + async generator is the FastAPI pattern — yield encoded bytes for each event
  • The OpenAI SDK's .stream() context manager yields ChatCompletionChunk objects with delta content
  • fetch with ReadableStream is the recommended client approach — it supports POST and full JSON bodies unlike EventSource
  • Once streaming starts, the HTTP 200 status is committed — communicate errors via JSON error events in the stream body
  • Set X-Accel-Buffering: no to prevent nginx from buffering the stream; Cache-Control: no-cache prevents proxy caching
  • Check await request.is_disconnected() inside the generator to stop sending when the client drops the connection

Next lesson: Background Tasks — offloading work like audit logging after the response is sent.

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.