Learnixo
Back to blog
AI Systemsintermediate

The Runnable Interface: pipe(), invoke(), stream()

Deep dive into LangChain's Runnable protocol. Understand invoke, stream, batch, async methods, config injection, and how to build custom Runnables.

Asma Hafeez KhanMay 16, 20265 min read
LangChainRunnableLCELStreamingAsyncInterface
Share:š•

The Runnable Protocol

Every LangChain component implements the Runnable interface. This is the contract that enables LCEL composition. Understanding this interface lets you build custom components that integrate seamlessly.

Python
from langchain_core.runnables import Runnable
from abc import abstractmethod
from typing import Any, Iterator, AsyncIterator

class Runnable:
    # These are the core methods every Runnable must implement:
    
    def invoke(self, input: Any, config=None) -> Any:
        """Single synchronous call."""
        ...
    
    def stream(self, input: Any, config=None) -> Iterator[Any]:
        """Streaming synchronous call — yields chunks."""
        ...
    
    async def ainvoke(self, input: Any, config=None) -> Any:
        """Single async call."""
        ...
    
    async def astream(self, input: Any, config=None) -> AsyncIterator[Any]:
        """Streaming async call."""
        ...
    
    def batch(self, inputs: list[Any], config=None) -> list[Any]:
        """Process multiple inputs. Default: calls invoke() per item."""
        ...
    
    def __or__(self, other: "Runnable") -> "RunnableSequence":
        """The | operator creates a sequential chain."""
        return RunnableSequence(first=self, last=other)

invoke(): Synchronous Single Call

Python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_template("Summarize {drug} in 2 sentences.")
parser = StrOutputParser()
chain = prompt | model | parser

# Basic invoke
result = chain.invoke({"drug": "warfarin"})
# Returns: str

# With config
from langchain_core.runnables import RunnableConfig

result = chain.invoke(
    {"drug": "warfarin"},
    config=RunnableConfig(
        tags=["clinical"],
        metadata={"user_id": "user_123", "session": "sess_abc"},
        callbacks=None,       # Inject custom callbacks here
        max_concurrency=10,
    ),
)

# Each Runnable's invoke input/output type:
# ChatPromptTemplate.invoke: dict → PromptValue (renderable as messages)
# ChatOpenAI.invoke:         PromptValue/list[BaseMessage] → AIMessage
# StrOutputParser.invoke:    AIMessage → str

stream(): Token-Level Streaming

Python
import sys

# stream() yields chunks as the model generates them
for chunk in chain.stream({"drug": "warfarin"}):
    # Each chunk is a string fragment (after StrOutputParser)
    print(chunk, end="", flush=True)
print()

# Stream raw AIMessageChunk objects (before parsing)
raw_chain = prompt | model
for chunk in raw_chain.stream({"drug": "metformin"}):
    # chunk is AIMessageChunk with .content
    print(chunk.content, end="", flush=True)
print()

# stream_mode="values" — get the accumulated value at each step
for step_output in chain.stream({"drug": "aspirin"}, stream_mode="values"):
    print("Step output:", step_output)
    # Useful for debugging: see intermediate chain outputs

# Streaming with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/stream/{drug}")
async def stream_drug_info(drug: str):
    async def generator():
        async for chunk in chain.astream({"drug": drug}):
            yield chunk

    return StreamingResponse(generator(), media_type="text/plain")

batch(): Process Multiple Inputs

Python
# Process a list of inputs efficiently
# Default: calls invoke() for each in the list (can be parallelized)
results = chain.batch([
    {"drug": "warfarin"},
    {"drug": "metformin"},
    {"drug": "aspirin"},
    {"drug": "lisinopril"},
])
# Returns: [str, str, str, str]

# Control concurrency
results = chain.batch(
    [{"drug": d} for d in ["warfarin", "metformin", "aspirin"]],
    config=RunnableConfig(max_concurrency=3),   # Run at most 3 in parallel
)

# batch_as_completed() — get results as they finish (not in order)
from langchain_core.runnables import RunnableLambda

for i, result in chain.batch_as_completed([
    {"drug": "warfarin"},
    {"drug": "metformin"},
    {"drug": "aspirin"},
]):
    print(f"Input {i} completed: {result[:50]}")

Async Methods

Python
import asyncio

# ainvoke: async version of invoke
async def get_drug_info(drug: str) -> str:
    return await chain.ainvoke({"drug": drug})

# astream: async version of stream
async def stream_drug_info(drug: str):
    async for chunk in chain.astream({"drug": drug}):
        print(chunk, end="", flush=True)

# abatch: async version of batch — all run concurrently
async def get_all_drugs(drugs: list[str]) -> list[str]:
    return await chain.abatch([{"drug": d} for d in drugs])

# Run multiple chains concurrently with asyncio.gather
async def parallel_queries():
    results = await asyncio.gather(
        chain.ainvoke({"drug": "warfarin"}),
        chain.ainvoke({"drug": "metformin"}),
        chain.ainvoke({"drug": "aspirin"}),
    )
    return results

# Run in main
results = asyncio.run(parallel_queries())

Building a Custom Runnable

Two ways to create custom Runnables:

Method 1: RunnableLambda (quickest)

Python
from langchain_core.runnables import RunnableLambda

def validate_drug_query(inputs: dict) -> dict:
    """Validate and normalize drug query inputs."""
    drug = inputs.get("drug", "").strip()
    if not drug:
        raise ValueError("Drug name cannot be empty")
    return {"drug": drug.lower(), "original_drug": inputs["drug"]}

validator = RunnableLambda(validate_drug_query)

# Use in a chain
validated_chain = validator | chain

# Async lambda
async def async_lookup(drug: str) -> dict:
    # Simulate async database lookup
    await asyncio.sleep(0.01)
    return {"drug_id": hash(drug), "drug_name": drug}

async_runnable = RunnableLambda(async_lookup)

Method 2: Subclass Runnable (full control)

Python
from langchain_core.runnables import Runnable
from typing import Optional, Iterator

class DrugDatabaseLookup(Runnable):
    """Custom Runnable that queries a drug database."""

    def __init__(self, database_url: str):
        self.db_url = database_url

    def invoke(self, input: str, config: Optional[RunnableConfig] = None) -> dict:
        """Lookup drug info from database."""
        # In production: query real database
        drug_name = input.lower().strip()
        return {
            "drug": drug_name,
            "description": f"Clinical information for {drug_name}",
            "available": True,
        }

    def stream(self, input: str, config=None) -> Iterator[dict]:
        """Streaming for this Runnable just yields the result at once."""
        yield self.invoke(input, config)

    async def ainvoke(self, input: str, config=None) -> dict:
        """Async database lookup."""
        import aiohttp
        # async database query here
        return self.invoke(input, config)


# Use in chain
db_lookup = DrugDatabaseLookup(database_url="postgresql://...")
enrichment_prompt = ChatPromptTemplate.from_template(
    "Based on this drug data: {drug}\n{description}\n\nWhat should a clinician know?"
)
enriched_chain = db_lookup | enrichment_prompt | model | parser
result = enriched_chain.invoke("warfarin")

Runnable Modifiers

LangChain provides modifier methods that wrap any Runnable:

Python
# .with_retry() — retry on failure
chain_with_retry = chain.with_retry(
    stop_after_attempt=3,
    retry_if_exception_type=(Exception,),
    wait_exponential_jitter=True,      # Add randomness to backoff
)

# .with_fallbacks() — fallback chain on failure
cheap_chain = prompt | ChatOpenAI(model="gpt-4o-mini") | parser
chain_with_fallback = chain.with_fallbacks([cheap_chain])

# .with_config() — set default config
chain_for_prod = chain.with_config({
    "tags": ["production"],
    "metadata": {"environment": "prod"},
})

# .with_types() — type hint the input/output
typed_chain = chain.with_types(
    input_type=dict,
    output_type=str,
)

# .with_listeners() — attach event handlers
chain_with_logging = chain.with_listeners(
    on_start=lambda run_obj: print(f"Started: {run_obj.id}"),
    on_end=lambda run_obj: print(f"Ended: {run_obj.id}"),
    on_error=lambda run_obj: print(f"Error: {run_obj.error}"),
)

# .configurable_fields() — make parameters configurable at runtime
from langchain_core.runnables import ConfigurableField

configurable_model = ChatOpenAI(model="gpt-4o", temperature=0).configurable_fields(
    model_name=ConfigurableField(
        id="model_name",
        name="Model Name",
        description="The OpenAI model to use",
    )
)

# Change model at runtime without rebuilding chain
result = chain.invoke(
    {"drug": "warfarin"},
    config={"configurable": {"model_name": "gpt-4o-mini"}},
)

The pipe() Method vs | Operator

| and .pipe() are equivalent. Use .pipe() when chaining dynamically:

Python
# These three are identical:
chain_a = prompt | model | parser
chain_b = prompt.pipe(model).pipe(parser)
chain_c = prompt.pipe(model, parser)

# .pipe() is useful when building chains dynamically
steps = [prompt, model, parser]
chain = steps[0]
for step in steps[1:]:
    chain = chain.pipe(step)

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.