Learnixo
Back to blog
AI Systemsintermediate

Parallel Chains with RunnableParallel

Run multiple LangChain chains simultaneously with RunnableParallel. Reduce latency by parallelizing independent steps, merge outputs, and handle fan-out patterns.

Asma Hafeez KhanMay 16, 20265 min read
LangChainParallelRunnableParallelLCELPerformanceConcurrency
Share:š•

Why Run Chains in Parallel?

When two chains are independent — meaning step B doesn't need the output of step A — running them sequentially wastes time. Parallel execution cuts latency to the maximum of individual times rather than their sum.

Sequential: A (2s) → B (2s) → C (1s) = 5s total
Parallel:   A (2s) ─┐
            B (2s) ─┼→ merge → C (1s) = 3s total
            (skipped)ā”˜

RunnableParallel Basics

Python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

model = ChatOpenAI(model="gpt-4o", temperature=0)
parser = StrOutputParser()

# Three independent questions about the same drug
mechanism_chain = (
    ChatPromptTemplate.from_template("Explain the mechanism of {drug} in one sentence.")
    | model | parser
)

side_effects_chain = (
    ChatPromptTemplate.from_template("List the top 3 side effects of {drug}.")
    | model | parser
)

interactions_chain = (
    ChatPromptTemplate.from_template("What are the major drug interactions of {drug}?")
    | model | parser
)

# Run all three in parallel
parallel_chain = RunnableParallel(
    mechanism=mechanism_chain,
    side_effects=side_effects_chain,
    major_interactions=interactions_chain,
)

result = parallel_chain.invoke({"drug": "warfarin"})
# {
#   "mechanism": "Warfarin inhibits vitamin K...",
#   "side_effects": "1. Bleeding...",
#   "major_interactions": "NSAIDs, antibiotics..."
# }
# Latency ā‰ˆ max(mechanism, side_effects, interactions) — not their sum

print(result["mechanism"])
print(result["side_effects"])

Dict Shorthand Syntax

LangChain provides a shorthand: a plain dict of runnables is automatically treated as RunnableParallel:

Python
from langchain_core.runnables import RunnablePassthrough

# This is equivalent to RunnableParallel(...)
chain = (
    {"mechanism": mechanism_chain, "side_effects": side_effects_chain}
    | ChatPromptTemplate.from_template(
        "Drug summary based on:\nMechanism: {mechanism}\nSide effects: {side_effects}"
    )
    | model | parser
)

result = chain.invoke({"drug": "metformin"})
# Dict is automatically expanded into a RunnableParallel

Fan-Out Pattern: One Input, Multiple Outputs

Python
# Pattern: one query → multiple specialized analyses → merge → one response

query = "warfarin"

# Fan out to specialists
drug_analyst_chain = (
    ChatPromptTemplate.from_template("Analyze {drug} pharmacodynamics.")
    | model | parser
)

safety_analyst_chain = (
    ChatPromptTemplate.from_template("Analyze {drug} safety profile and contraindications.")
    | model | parser
)

interaction_analyst_chain = (
    ChatPromptTemplate.from_template("Analyze {drug} drug-drug interactions.")
    | model | parser
)

# Fan-out (all three run simultaneously)
fan_out = RunnableParallel(
    pharmacodynamics=drug_analyst_chain,
    safety=safety_analyst_chain,
    interactions=interaction_analyst_chain,
)

# Fan-in: merge the parallel outputs into one final answer
merge_prompt = ChatPromptTemplate.from_template(
    """Compile this drug analysis into a clinical summary:

Pharmacodynamics: {pharmacodynamics}

Safety: {safety}

Interactions: {interactions}

Drug name: {drug}

Provide a 3-bullet clinical overview."""
)

# Full fan-out + fan-in pipeline
full_pipeline = (
    RunnablePassthrough.assign(**fan_out.steps)   # Add pharmacodynamics, safety, interactions
    | merge_prompt
    | model | parser
)

result = full_pipeline.invoke({"drug": "warfarin"})

Parallel with RunnablePassthrough

Combine RunnablePassthrough with RunnableParallel to pass the original input alongside parallel outputs:

Python
from langchain_core.runnables import RunnablePassthrough

# Pass original query through while also running analyses
chain = (
    RunnableParallel(
        # Keep the original input
        original=RunnablePassthrough(),
        # Run analyses in parallel
        mechanism=mechanism_chain,
        side_effects=side_effects_chain,
    )
    # Now have: original, mechanism, side_effects
    | RunnableLambda(lambda d: {
        "drug": d["original"]["drug"],
        "summary": f"Mechanism: {d['mechanism']}\nSide effects: {d['side_effects']}",
    })
)

result = chain.invoke({"drug": "warfarin"})
print(result["drug"])     # "warfarin" — carried through
print(result["summary"])  # Combined output

Async Parallel Execution

For web server use cases, async parallel is more efficient:

Python
import asyncio
from langchain_core.runnables import RunnableParallel

async def analyze_drug_async(drug: str) -> dict:
    """Analyze drug with multiple concurrent LLM calls."""
    parallel_chain = RunnableParallel(
        mechanism=mechanism_chain,
        safety=safety_analyst_chain,
        interactions=interaction_analyst_chain,
    )
    return await parallel_chain.ainvoke({"drug": drug})

# Or use asyncio.gather for multiple drugs
async def analyze_multiple_drugs(drugs: list[str]) -> list[dict]:
    tasks = [analyze_drug_async(drug) for drug in drugs]
    return await asyncio.gather(*tasks)

results = asyncio.run(analyze_multiple_drugs(["warfarin", "metformin", "aspirin"]))

Batch + Parallel: Processing Drug Lists

Python
from langchain_core.runnables import RunnableConfig

drugs = ["warfarin", "metformin", "aspirin", "lisinopril", "atorvastatin"]

# Option 1: .batch() — processes list with configurable concurrency
results = parallel_chain.batch(
    [{"drug": d} for d in drugs],
    config=RunnableConfig(max_concurrency=5),
)

# Option 2: asyncio.gather — full async concurrency
async def process_all():
    tasks = [parallel_chain.ainvoke({"drug": d}) for d in drugs]
    return await asyncio.gather(*tasks)

results = asyncio.run(process_all())
for drug, result in zip(drugs, results):
    print(f"{drug}: {result['mechanism'][:80]}")

Error Handling in Parallel Chains

Python
from langchain_core.runnables import RunnableLambda

def safe_chain_runner(chain):
    """Wrap a chain to return an error dict on failure."""
    def run_safely(inputs):
        try:
            return chain.invoke(inputs)
        except Exception as e:
            return f"Error: {str(e)}"
    return RunnableLambda(run_safely)


# Wrap each chain with error handling
robust_parallel = RunnableParallel(
    mechanism=safe_chain_runner(mechanism_chain),
    side_effects=safe_chain_runner(side_effects_chain),
    interactions=safe_chain_runner(interaction_analyst_chain),
)

result = robust_parallel.invoke({"drug": "unknown_drug_xyz"})
# Even if some chains fail, others succeed
# {"mechanism": "Error: ...", "side_effects": "...", "interactions": "..."}

Measuring Parallel Speedup

Python
import time

drug_query = {"drug": "warfarin"}

# Measure sequential
t0 = time.time()
seq_result = {
    "mechanism": mechanism_chain.invoke(drug_query),
    "side_effects": side_effects_chain.invoke(drug_query),
    "interactions": interaction_analyst_chain.invoke(drug_query),
}
sequential_time = time.time() - t0

# Measure parallel
t1 = time.time()
par_result = parallel_chain.invoke(drug_query)
parallel_time = time.time() - t1

print(f"Sequential: {sequential_time:.2f}s")
print(f"Parallel:   {parallel_time:.2f}s")
print(f"Speedup:    {sequential_time/parallel_time:.1f}x")
# Typical: 3x speedup for 3 equal-time chains
# Actual speedup = total_sequential / max_individual_chain

When Not to Parallelize

Parallelism is NOT appropriate when:

  1. Step B needs step A's output — use sequential RunnablePassthrough.assign
  2. Rate limits are tight — parallel requests hit rate limits simultaneously
  3. Cost is critical — parallel = more concurrent API calls = higher bill at peak
  4. One chain is much faster — running a 10ms chain in parallel with a 2s chain wastes overhead
Python
# Rule of thumb:
# n_steps Ɨ average_step_time > max_step_time Ɨ overhead_factor (1.2)
# Only parallelize if you save more than 20% of wall time

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.