LangChain Mastery · Lesson 8 of 33
Parallel Chains with RunnableParallel
Why Run Chains in Parallel?
When two chains are independent — meaning step B doesn't need the output of step A — running them sequentially wastes time. Parallel execution cuts latency to the maximum of individual times rather than their sum.
Sequential: A (2s) → B (2s) → C (1s) = 5s total
Parallel: A (2s) ─┐
B (2s) ─┼→ merge → C (1s) = 3s total
(skipped)┘RunnableParallel Basics
Python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
model = ChatOpenAI(model="gpt-4o", temperature=0)
parser = StrOutputParser()
# Three independent questions about the same drug
mechanism_chain = (
ChatPromptTemplate.from_template("Explain the mechanism of {drug} in one sentence.")
| model | parser
)
side_effects_chain = (
ChatPromptTemplate.from_template("List the top 3 side effects of {drug}.")
| model | parser
)
interactions_chain = (
ChatPromptTemplate.from_template("What are the major drug interactions of {drug}?")
| model | parser
)
# Run all three in parallel
parallel_chain = RunnableParallel(
mechanism=mechanism_chain,
side_effects=side_effects_chain,
major_interactions=interactions_chain,
)
result = parallel_chain.invoke({"drug": "warfarin"})
# {
# "mechanism": "Warfarin inhibits vitamin K...",
# "side_effects": "1. Bleeding...",
# "major_interactions": "NSAIDs, antibiotics..."
# }
# Latency ≈ max(mechanism, side_effects, interactions) — not their sum
print(result["mechanism"])
print(result["side_effects"])Dict Shorthand Syntax
LangChain provides a shorthand: a plain dict of runnables is automatically treated as RunnableParallel:
Python
from langchain_core.runnables import RunnablePassthrough
# This is equivalent to RunnableParallel(...)
chain = (
{"mechanism": mechanism_chain, "side_effects": side_effects_chain}
| ChatPromptTemplate.from_template(
"Drug summary based on:\nMechanism: {mechanism}\nSide effects: {side_effects}"
)
| model | parser
)
result = chain.invoke({"drug": "metformin"})
# Dict is automatically expanded into a RunnableParallelFan-Out Pattern: One Input, Multiple Outputs
Python
# Pattern: one query → multiple specialized analyses → merge → one response
query = "warfarin"
# Fan out to specialists
drug_analyst_chain = (
ChatPromptTemplate.from_template("Analyze {drug} pharmacodynamics.")
| model | parser
)
safety_analyst_chain = (
ChatPromptTemplate.from_template("Analyze {drug} safety profile and contraindications.")
| model | parser
)
interaction_analyst_chain = (
ChatPromptTemplate.from_template("Analyze {drug} drug-drug interactions.")
| model | parser
)
# Fan-out (all three run simultaneously)
fan_out = RunnableParallel(
pharmacodynamics=drug_analyst_chain,
safety=safety_analyst_chain,
interactions=interaction_analyst_chain,
)
# Fan-in: merge the parallel outputs into one final answer
merge_prompt = ChatPromptTemplate.from_template(
"""Compile this drug analysis into a clinical summary:
Pharmacodynamics: {pharmacodynamics}
Safety: {safety}
Interactions: {interactions}
Drug name: {drug}
Provide a 3-bullet clinical overview."""
)
# Full fan-out + fan-in pipeline
full_pipeline = (
RunnablePassthrough.assign(**fan_out.steps) # Add pharmacodynamics, safety, interactions
| merge_prompt
| model | parser
)
result = full_pipeline.invoke({"drug": "warfarin"})Parallel with RunnablePassthrough
Combine RunnablePassthrough with RunnableParallel to pass the original input alongside parallel outputs:
Python
from langchain_core.runnables import RunnablePassthrough
# Pass original query through while also running analyses
chain = (
RunnableParallel(
# Keep the original input
original=RunnablePassthrough(),
# Run analyses in parallel
mechanism=mechanism_chain,
side_effects=side_effects_chain,
)
# Now have: original, mechanism, side_effects
| RunnableLambda(lambda d: {
"drug": d["original"]["drug"],
"summary": f"Mechanism: {d['mechanism']}\nSide effects: {d['side_effects']}",
})
)
result = chain.invoke({"drug": "warfarin"})
print(result["drug"]) # "warfarin" — carried through
print(result["summary"]) # Combined outputAsync Parallel Execution
For web server use cases, async parallel is more efficient:
Python
import asyncio
from langchain_core.runnables import RunnableParallel
async def analyze_drug_async(drug: str) -> dict:
"""Analyze drug with multiple concurrent LLM calls."""
parallel_chain = RunnableParallel(
mechanism=mechanism_chain,
safety=safety_analyst_chain,
interactions=interaction_analyst_chain,
)
return await parallel_chain.ainvoke({"drug": drug})
# Or use asyncio.gather for multiple drugs
async def analyze_multiple_drugs(drugs: list[str]) -> list[dict]:
tasks = [analyze_drug_async(drug) for drug in drugs]
return await asyncio.gather(*tasks)
results = asyncio.run(analyze_multiple_drugs(["warfarin", "metformin", "aspirin"]))Batch + Parallel: Processing Drug Lists
Python
from langchain_core.runnables import RunnableConfig
drugs = ["warfarin", "metformin", "aspirin", "lisinopril", "atorvastatin"]
# Option 1: .batch() — processes list with configurable concurrency
results = parallel_chain.batch(
[{"drug": d} for d in drugs],
config=RunnableConfig(max_concurrency=5),
)
# Option 2: asyncio.gather — full async concurrency
async def process_all():
tasks = [parallel_chain.ainvoke({"drug": d}) for d in drugs]
return await asyncio.gather(*tasks)
results = asyncio.run(process_all())
for drug, result in zip(drugs, results):
print(f"{drug}: {result['mechanism'][:80]}")Error Handling in Parallel Chains
Python
from langchain_core.runnables import RunnableLambda
def safe_chain_runner(chain):
"""Wrap a chain to return an error dict on failure."""
def run_safely(inputs):
try:
return chain.invoke(inputs)
except Exception as e:
return f"Error: {str(e)}"
return RunnableLambda(run_safely)
# Wrap each chain with error handling
robust_parallel = RunnableParallel(
mechanism=safe_chain_runner(mechanism_chain),
side_effects=safe_chain_runner(side_effects_chain),
interactions=safe_chain_runner(interaction_analyst_chain),
)
result = robust_parallel.invoke({"drug": "unknown_drug_xyz"})
# Even if some chains fail, others succeed
# {"mechanism": "Error: ...", "side_effects": "...", "interactions": "..."}Measuring Parallel Speedup
Python
import time
drug_query = {"drug": "warfarin"}
# Measure sequential
t0 = time.time()
seq_result = {
"mechanism": mechanism_chain.invoke(drug_query),
"side_effects": side_effects_chain.invoke(drug_query),
"interactions": interaction_analyst_chain.invoke(drug_query),
}
sequential_time = time.time() - t0
# Measure parallel
t1 = time.time()
par_result = parallel_chain.invoke(drug_query)
parallel_time = time.time() - t1
print(f"Sequential: {sequential_time:.2f}s")
print(f"Parallel: {parallel_time:.2f}s")
print(f"Speedup: {sequential_time/parallel_time:.1f}x")
# Typical: 3x speedup for 3 equal-time chains
# Actual speedup = total_sequential / max_individual_chainWhen Not to Parallelize
Parallelism is NOT appropriate when:
- Step B needs step A's output — use sequential
RunnablePassthrough.assign - Rate limits are tight — parallel requests hit rate limits simultaneously
- Cost is critical — parallel = more concurrent API calls = higher bill at peak
- One chain is much faster — running a 10ms chain in parallel with a 2s chain wastes overhead
Python
# Rule of thumb:
# n_steps × average_step_time > max_step_time × overhead_factor (1.2)
# Only parallelize if you save more than 20% of wall time