Streaming: Real-Time Output in LangChain
Stream LLM tokens in real-time with LCEL stream(), astream(), and astream_events(). Build streaming RAG, streaming agents, and Server-Sent Events for web UIs.
Why Streaming Matters
Without streaming, the user sees nothing until the full response is ready ā potentially 5-10 seconds for a long answer. With streaming, the first token appears in under a second and the response builds progressively.
Without streaming: [5 seconds of silence] ā Full answer appears
With streaming: [100ms] "Warfarin..." ā tokens appear one by oneAll LangChain LCEL runnables support streaming via the same interface.
Basic Streaming with stream()
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a clinical pharmacist."),
("human", "{question}"),
])
chain = prompt | model | StrOutputParser()
# stream() yields string chunks as they arrive
for chunk in chain.stream({"question": "Explain the warfarin mechanism in detail."}):
print(chunk, end="", flush=True)
print() # Newline after streaming completes
# Collect streamed output while displaying it
full_response = ""
for chunk in chain.stream({"question": "What are warfarin drug interactions?"}):
print(chunk, end="", flush=True)
full_response += chunk
print(f"\n\nFull response ({len(full_response)} chars)")
# Without StrOutputParser ā yields AIMessageChunk objects
model_only_chain = prompt | model
for chunk in model_only_chain.stream({"question": "What is warfarin?"}):
if chunk.content:
print(chunk.content, end="", flush=True)Async Streaming with astream()
Use astream() in async contexts (FastAPI, async frameworks):
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o", temperature=0)
chain = (
ChatPromptTemplate.from_messages([("system", "You are a clinical pharmacist."), ("human", "{question}")])
| model
| StrOutputParser()
)
async def stream_answer(question: str) -> str:
full = ""
async for chunk in chain.astream({"question": question}):
print(chunk, end="", flush=True)
full += chunk
print()
return full
asyncio.run(stream_answer("Explain warfarin interactions with aspirin."))astream_events(): Fine-Grained Event Streaming
astream_events gives you structured events at every step ā LLM tokens, tool calls, retrieval results ā not just the final output. Essential for streaming agents and complex chains.
import asyncio
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(collection_name="drug_formulary", embedding_function=embeddings, persist_directory="./chroma_db")
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
rag_chain = (
{"context": retriever | (lambda docs: "\n\n".join(d.page_content for d in docs)), "question": RunnablePassthrough()}
| ChatPromptTemplate.from_messages([
("system", "Answer using the context only.\n\nContext: {context}"),
("human", "{question}"),
])
| ChatOpenAI(model="gpt-4o", temperature=0)
| StrOutputParser()
)
async def stream_rag_with_events(question: str):
"""Stream a RAG chain showing retrieval + generation events."""
print(f"\nQuestion: {question}")
print("-" * 50)
async for event in rag_chain.astream_events(question, version="v2"):
event_type = event["event"]
event_name = event.get("name", "")
if event_type == "on_retriever_start":
print(f"\n[Retrieving documents for: {event['data']['input']['query'][:60]}]")
elif event_type == "on_retriever_end":
docs = event["data"]["output"]
print(f"[Retrieved {len(docs)} documents]")
for doc in docs:
print(f" ⢠{doc.page_content[:60]}...")
elif event_type == "on_chat_model_stream":
# Individual token from the LLM
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
elif event_type == "on_chat_model_end":
print() # Newline after streaming completes
print("-" * 50)
asyncio.run(stream_rag_with_events("What does warfarin inhibit?"))Streaming Agent Output
Agents have multiple steps (think ā tool ā think ā answer). astream_events lets you show each step as it happens:
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool
@tool
def search_drug_database(query: str) -> str:
"""Search the clinical drug database."""
return f"Drug information for: {query}"
tools = [search_drug_database]
model = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a clinical pharmacist. Use tools when needed."),
("placeholder", "{chat_history}"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(model, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=False)
async def stream_agent(question: str):
"""Stream agent execution step by step."""
async for event in executor.astream_events(
{"input": question, "chat_history": []},
version="v2",
):
event_type = event["event"]
if event_type == "on_tool_start":
tool_name = event["name"]
tool_input = event["data"].get("input", {})
print(f"\n[Calling tool: {tool_name}]")
print(f" Input: {tool_input}")
elif event_type == "on_tool_end":
output = event["data"].get("output", "")
print(f" Result: {str(output)[:100]}")
elif event_type == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
elif event_type == "on_chain_end" and event.get("name") == "AgentExecutor":
print("\n[Agent finished]")
asyncio.run(stream_agent("What is warfarin and what are its main uses?"))Streaming with FastAPI (Server-Sent Events)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import asyncio
app = FastAPI()
model = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)
chain = (
ChatPromptTemplate.from_messages([
("system", "You are a clinical pharmacist."),
("human", "{question}"),
])
| model
| StrOutputParser()
)
async def token_generator(question: str):
"""Yield SSE-formatted tokens."""
async for chunk in chain.astream({"question": question}):
if chunk:
# Server-Sent Events format: "data: <content>\n\n"
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
@app.get("/stream")
async def stream_endpoint(question: str):
return StreamingResponse(
token_generator(question),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
# Frontend JavaScript consumer:
# const es = new EventSource('/stream?question=What+is+warfarin');
# es.onmessage = (e) => {
# if (e.data === '[DONE]') { es.close(); return; }
# document.getElementById('answer').textContent += e.data;
# };Streaming Pitfalls
# Pitfall 1: Output parsers that buffer the full output before parsing
# PydanticOutputParser does NOT stream ā it needs the full JSON first
from langchain_core.output_parsers import JsonOutputParser
# JsonOutputParser DOES stream partial JSON as it arrives
json_chain = chain | JsonOutputParser()
for partial in json_chain.stream({"question": "..."}):
print(partial) # Dict builds up incrementally
# PydanticOutputParser does NOT stream ā buffer full response first
# Pitfall 2: Using invoke() inside an async generator (blocks the event loop)
# WRONG:
async def bad_generator(question: str):
result = chain.invoke({"question": question}) # Blocks! Use ainvoke instead
yield result
# CORRECT:
async def good_generator(question: str):
async for chunk in chain.astream({"question": question}):
yield chunk
# Pitfall 3: Streaming a chain with non-streaming steps
# If any step buffers (e.g., a transform that needs the full input),
# tokens will appear in one burst, not progressively.
# Solution: ensure each step passes chunks through or use astream_events
# to stream at the model level while other steps complete.Streaming Method Comparison
| Method | Returns | Use When |
|---|---|---|
| chain.stream(input) | Iterator of string chunks | Sync context, simple output |
| chain.astream(input) | Async iterator of string chunks | Async context (FastAPI, etc.) |
| chain.astream_events(input, version="v2") | Async iterator of event dicts | Need tool calls, retrieval events, or agent steps |
| StreamingStdOutCallbackHandler | Prints to stdout | Quick debugging |
| SSE via StreamingResponse | HTTP event stream | Browser/frontend consumption |
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.