LangChain Mastery · Lesson 4 of 33
The Runnable Interface: pipe(), invoke(), stream()
The Runnable Protocol
Every LangChain component implements the Runnable interface. This is the contract that enables LCEL composition. Understanding this interface lets you build custom components that integrate seamlessly.
Python
from langchain_core.runnables import Runnable
from abc import abstractmethod
from typing import Any, Iterator, AsyncIterator
class Runnable:
# These are the core methods every Runnable must implement:
def invoke(self, input: Any, config=None) -> Any:
"""Single synchronous call."""
...
def stream(self, input: Any, config=None) -> Iterator[Any]:
"""Streaming synchronous call — yields chunks."""
...
async def ainvoke(self, input: Any, config=None) -> Any:
"""Single async call."""
...
async def astream(self, input: Any, config=None) -> AsyncIterator[Any]:
"""Streaming async call."""
...
def batch(self, inputs: list[Any], config=None) -> list[Any]:
"""Process multiple inputs. Default: calls invoke() per item."""
...
def __or__(self, other: "Runnable") -> "RunnableSequence":
"""The | operator creates a sequential chain."""
return RunnableSequence(first=self, last=other)invoke(): Synchronous Single Call
Python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o", temperature=0)
prompt = ChatPromptTemplate.from_template("Summarize {drug} in 2 sentences.")
parser = StrOutputParser()
chain = prompt | model | parser
# Basic invoke
result = chain.invoke({"drug": "warfarin"})
# Returns: str
# With config
from langchain_core.runnables import RunnableConfig
result = chain.invoke(
{"drug": "warfarin"},
config=RunnableConfig(
tags=["clinical"],
metadata={"user_id": "user_123", "session": "sess_abc"},
callbacks=None, # Inject custom callbacks here
max_concurrency=10,
),
)
# Each Runnable's invoke input/output type:
# ChatPromptTemplate.invoke: dict → PromptValue (renderable as messages)
# ChatOpenAI.invoke: PromptValue/list[BaseMessage] → AIMessage
# StrOutputParser.invoke: AIMessage → strstream(): Token-Level Streaming
Python
import sys
# stream() yields chunks as the model generates them
for chunk in chain.stream({"drug": "warfarin"}):
# Each chunk is a string fragment (after StrOutputParser)
print(chunk, end="", flush=True)
print()
# Stream raw AIMessageChunk objects (before parsing)
raw_chain = prompt | model
for chunk in raw_chain.stream({"drug": "metformin"}):
# chunk is AIMessageChunk with .content
print(chunk.content, end="", flush=True)
print()
# stream_mode="values" — get the accumulated value at each step
for step_output in chain.stream({"drug": "aspirin"}, stream_mode="values"):
print("Step output:", step_output)
# Useful for debugging: see intermediate chain outputs
# Streaming with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/stream/{drug}")
async def stream_drug_info(drug: str):
async def generator():
async for chunk in chain.astream({"drug": drug}):
yield chunk
return StreamingResponse(generator(), media_type="text/plain")batch(): Process Multiple Inputs
Python
# Process a list of inputs efficiently
# Default: calls invoke() for each in the list (can be parallelized)
results = chain.batch([
{"drug": "warfarin"},
{"drug": "metformin"},
{"drug": "aspirin"},
{"drug": "lisinopril"},
])
# Returns: [str, str, str, str]
# Control concurrency
results = chain.batch(
[{"drug": d} for d in ["warfarin", "metformin", "aspirin"]],
config=RunnableConfig(max_concurrency=3), # Run at most 3 in parallel
)
# batch_as_completed() — get results as they finish (not in order)
from langchain_core.runnables import RunnableLambda
for i, result in chain.batch_as_completed([
{"drug": "warfarin"},
{"drug": "metformin"},
{"drug": "aspirin"},
]):
print(f"Input {i} completed: {result[:50]}")Async Methods
Python
import asyncio
# ainvoke: async version of invoke
async def get_drug_info(drug: str) -> str:
return await chain.ainvoke({"drug": drug})
# astream: async version of stream
async def stream_drug_info(drug: str):
async for chunk in chain.astream({"drug": drug}):
print(chunk, end="", flush=True)
# abatch: async version of batch — all run concurrently
async def get_all_drugs(drugs: list[str]) -> list[str]:
return await chain.abatch([{"drug": d} for d in drugs])
# Run multiple chains concurrently with asyncio.gather
async def parallel_queries():
results = await asyncio.gather(
chain.ainvoke({"drug": "warfarin"}),
chain.ainvoke({"drug": "metformin"}),
chain.ainvoke({"drug": "aspirin"}),
)
return results
# Run in main
results = asyncio.run(parallel_queries())Building a Custom Runnable
Two ways to create custom Runnables:
Method 1: RunnableLambda (quickest)
Python
from langchain_core.runnables import RunnableLambda
def validate_drug_query(inputs: dict) -> dict:
"""Validate and normalize drug query inputs."""
drug = inputs.get("drug", "").strip()
if not drug:
raise ValueError("Drug name cannot be empty")
return {"drug": drug.lower(), "original_drug": inputs["drug"]}
validator = RunnableLambda(validate_drug_query)
# Use in a chain
validated_chain = validator | chain
# Async lambda
async def async_lookup(drug: str) -> dict:
# Simulate async database lookup
await asyncio.sleep(0.01)
return {"drug_id": hash(drug), "drug_name": drug}
async_runnable = RunnableLambda(async_lookup)Method 2: Subclass Runnable (full control)
Python
from langchain_core.runnables import Runnable
from typing import Optional, Iterator
class DrugDatabaseLookup(Runnable):
"""Custom Runnable that queries a drug database."""
def __init__(self, database_url: str):
self.db_url = database_url
def invoke(self, input: str, config: Optional[RunnableConfig] = None) -> dict:
"""Lookup drug info from database."""
# In production: query real database
drug_name = input.lower().strip()
return {
"drug": drug_name,
"description": f"Clinical information for {drug_name}",
"available": True,
}
def stream(self, input: str, config=None) -> Iterator[dict]:
"""Streaming for this Runnable just yields the result at once."""
yield self.invoke(input, config)
async def ainvoke(self, input: str, config=None) -> dict:
"""Async database lookup."""
import aiohttp
# async database query here
return self.invoke(input, config)
# Use in chain
db_lookup = DrugDatabaseLookup(database_url="postgresql://...")
enrichment_prompt = ChatPromptTemplate.from_template(
"Based on this drug data: {drug}\n{description}\n\nWhat should a clinician know?"
)
enriched_chain = db_lookup | enrichment_prompt | model | parser
result = enriched_chain.invoke("warfarin")Runnable Modifiers
LangChain provides modifier methods that wrap any Runnable:
Python
# .with_retry() — retry on failure
chain_with_retry = chain.with_retry(
stop_after_attempt=3,
retry_if_exception_type=(Exception,),
wait_exponential_jitter=True, # Add randomness to backoff
)
# .with_fallbacks() — fallback chain on failure
cheap_chain = prompt | ChatOpenAI(model="gpt-4o-mini") | parser
chain_with_fallback = chain.with_fallbacks([cheap_chain])
# .with_config() — set default config
chain_for_prod = chain.with_config({
"tags": ["production"],
"metadata": {"environment": "prod"},
})
# .with_types() — type hint the input/output
typed_chain = chain.with_types(
input_type=dict,
output_type=str,
)
# .with_listeners() — attach event handlers
chain_with_logging = chain.with_listeners(
on_start=lambda run_obj: print(f"Started: {run_obj.id}"),
on_end=lambda run_obj: print(f"Ended: {run_obj.id}"),
on_error=lambda run_obj: print(f"Error: {run_obj.error}"),
)
# .configurable_fields() — make parameters configurable at runtime
from langchain_core.runnables import ConfigurableField
configurable_model = ChatOpenAI(model="gpt-4o", temperature=0).configurable_fields(
model_name=ConfigurableField(
id="model_name",
name="Model Name",
description="The OpenAI model to use",
)
)
# Change model at runtime without rebuilding chain
result = chain.invoke(
{"drug": "warfarin"},
config={"configurable": {"model_name": "gpt-4o-mini"}},
)The pipe() Method vs | Operator
| and .pipe() are equivalent. Use .pipe() when chaining dynamically:
Python
# These three are identical:
chain_a = prompt | model | parser
chain_b = prompt.pipe(model).pipe(parser)
chain_c = prompt.pipe(model, parser)
# .pipe() is useful when building chains dynamically
steps = [prompt, model, parser]
chain = steps[0]
for step in steps[1:]:
chain = chain.pipe(step)