Learnixo
Back to blog
AI Systemsintermediate

Streaming: Real-Time Output in LangChain

Stream LLM tokens in real-time with LCEL stream(), astream(), and astream_events(). Build streaming RAG, streaming agents, and Server-Sent Events for web UIs.

Asma Hafeez KhanMay 16, 20266 min read
LangChainStreamingLCELAsyncSSEReal-Timeastream_events
Share:š•

Why Streaming Matters

Without streaming, the user sees nothing until the full response is ready — potentially 5-10 seconds for a long answer. With streaming, the first token appears in under a second and the response builds progressively.

Without streaming:  [5 seconds of silence]  → Full answer appears
With streaming:     [100ms]  "Warfarin..."  → tokens appear one by one

All LangChain LCEL runnables support streaming via the same interface.


Basic Streaming with stream()

Python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a clinical pharmacist."),
    ("human", "{question}"),
])

chain = prompt | model | StrOutputParser()

# stream() yields string chunks as they arrive
for chunk in chain.stream({"question": "Explain the warfarin mechanism in detail."}):
    print(chunk, end="", flush=True)
print()   # Newline after streaming completes


# Collect streamed output while displaying it
full_response = ""
for chunk in chain.stream({"question": "What are warfarin drug interactions?"}):
    print(chunk, end="", flush=True)
    full_response += chunk
print(f"\n\nFull response ({len(full_response)} chars)")


# Without StrOutputParser — yields AIMessageChunk objects
model_only_chain = prompt | model

for chunk in model_only_chain.stream({"question": "What is warfarin?"}):
    if chunk.content:
        print(chunk.content, end="", flush=True)

Async Streaming with astream()

Use astream() in async contexts (FastAPI, async frameworks):

Python
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o", temperature=0)
chain = (
    ChatPromptTemplate.from_messages([("system", "You are a clinical pharmacist."), ("human", "{question}")])
    | model
    | StrOutputParser()
)

async def stream_answer(question: str) -> str:
    full = ""
    async for chunk in chain.astream({"question": question}):
        print(chunk, end="", flush=True)
        full += chunk
    print()
    return full


asyncio.run(stream_answer("Explain warfarin interactions with aspirin."))

astream_events(): Fine-Grained Event Streaming

astream_events gives you structured events at every step — LLM tokens, tool calls, retrieval results — not just the final output. Essential for streaming agents and complex chains.

Python
import asyncio
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(collection_name="drug_formulary", embedding_function=embeddings, persist_directory="./chroma_db")
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

rag_chain = (
    {"context": retriever | (lambda docs: "\n\n".join(d.page_content for d in docs)), "question": RunnablePassthrough()}
    | ChatPromptTemplate.from_messages([
        ("system", "Answer using the context only.\n\nContext: {context}"),
        ("human", "{question}"),
    ])
    | ChatOpenAI(model="gpt-4o", temperature=0)
    | StrOutputParser()
)


async def stream_rag_with_events(question: str):
    """Stream a RAG chain showing retrieval + generation events."""
    
    print(f"\nQuestion: {question}")
    print("-" * 50)
    
    async for event in rag_chain.astream_events(question, version="v2"):
        event_type = event["event"]
        event_name = event.get("name", "")

        if event_type == "on_retriever_start":
            print(f"\n[Retrieving documents for: {event['data']['input']['query'][:60]}]")
        
        elif event_type == "on_retriever_end":
            docs = event["data"]["output"]
            print(f"[Retrieved {len(docs)} documents]")
            for doc in docs:
                print(f"  • {doc.page_content[:60]}...")
        
        elif event_type == "on_chat_model_stream":
            # Individual token from the LLM
            chunk = event["data"]["chunk"]
            if hasattr(chunk, "content") and chunk.content:
                print(chunk.content, end="", flush=True)
        
        elif event_type == "on_chat_model_end":
            print()  # Newline after streaming completes
    
    print("-" * 50)


asyncio.run(stream_rag_with_events("What does warfarin inhibit?"))

Streaming Agent Output

Agents have multiple steps (think → tool → think → answer). astream_events lets you show each step as it happens:

Python
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool

@tool
def search_drug_database(query: str) -> str:
    """Search the clinical drug database."""
    return f"Drug information for: {query}"

tools = [search_drug_database]
model = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a clinical pharmacist. Use tools when needed."),
    ("placeholder", "{chat_history}"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

agent = create_tool_calling_agent(model, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=False)


async def stream_agent(question: str):
    """Stream agent execution step by step."""
    
    async for event in executor.astream_events(
        {"input": question, "chat_history": []},
        version="v2",
    ):
        event_type = event["event"]

        if event_type == "on_tool_start":
            tool_name = event["name"]
            tool_input = event["data"].get("input", {})
            print(f"\n[Calling tool: {tool_name}]")
            print(f"  Input: {tool_input}")
        
        elif event_type == "on_tool_end":
            output = event["data"].get("output", "")
            print(f"  Result: {str(output)[:100]}")
        
        elif event_type == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            if hasattr(chunk, "content") and chunk.content:
                print(chunk.content, end="", flush=True)
        
        elif event_type == "on_chain_end" and event.get("name") == "AgentExecutor":
            print("\n[Agent finished]")


asyncio.run(stream_agent("What is warfarin and what are its main uses?"))

Streaming with FastAPI (Server-Sent Events)

Python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import asyncio

app = FastAPI()

model = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)
chain = (
    ChatPromptTemplate.from_messages([
        ("system", "You are a clinical pharmacist."),
        ("human", "{question}"),
    ])
    | model
    | StrOutputParser()
)


async def token_generator(question: str):
    """Yield SSE-formatted tokens."""
    async for chunk in chain.astream({"question": question}):
        if chunk:
            # Server-Sent Events format: "data: <content>\n\n"
            yield f"data: {chunk}\n\n"
    yield "data: [DONE]\n\n"


@app.get("/stream")
async def stream_endpoint(question: str):
    return StreamingResponse(
        token_generator(question),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",   # Disable nginx buffering
        },
    )

# Frontend JavaScript consumer:
# const es = new EventSource('/stream?question=What+is+warfarin');
# es.onmessage = (e) => {
#   if (e.data === '[DONE]') { es.close(); return; }
#   document.getElementById('answer').textContent += e.data;
# };

Streaming Pitfalls

Python
# Pitfall 1: Output parsers that buffer the full output before parsing
# PydanticOutputParser does NOT stream — it needs the full JSON first
from langchain_core.output_parsers import JsonOutputParser

# JsonOutputParser DOES stream partial JSON as it arrives
json_chain = chain | JsonOutputParser()
for partial in json_chain.stream({"question": "..."}):
    print(partial)   # Dict builds up incrementally

# PydanticOutputParser does NOT stream — buffer full response first


# Pitfall 2: Using invoke() inside an async generator (blocks the event loop)
# WRONG:
async def bad_generator(question: str):
    result = chain.invoke({"question": question})   # Blocks! Use ainvoke instead
    yield result

# CORRECT:
async def good_generator(question: str):
    async for chunk in chain.astream({"question": question}):
        yield chunk


# Pitfall 3: Streaming a chain with non-streaming steps
# If any step buffers (e.g., a transform that needs the full input),
# tokens will appear in one burst, not progressively.
# Solution: ensure each step passes chunks through or use astream_events
# to stream at the model level while other steps complete.

Streaming Method Comparison

| Method | Returns | Use When | |---|---|---| | chain.stream(input) | Iterator of string chunks | Sync context, simple output | | chain.astream(input) | Async iterator of string chunks | Async context (FastAPI, etc.) | | chain.astream_events(input, version="v2") | Async iterator of event dicts | Need tool calls, retrieval events, or agent steps | | StreamingStdOutCallbackHandler | Prints to stdout | Quick debugging | | SSE via StreamingResponse | HTTP event stream | Browser/frontend consumption |

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.