Agentic AI Patterns · Lesson 11 of 15
Pipeline Pattern: Output of One Feeds the Next
What Is the Pipeline Pattern?
In the pipeline pattern, agents are arranged in a linear sequence. Each agent receives the output of the previous agent, transforms it, and passes it to the next. The data flows in one direction: no cycles, no branching.
Input → [Agent 1] → [Agent 2] → [Agent 3] → Output
Research Analyze WriteThis is the simplest multi-agent topology. It maps naturally to workflows that have clear, ordered stages.
When to Use Pipelines
Use when:
- The task has clear sequential stages (research → outline → write → review)
- Each stage requires different specialization or a different context
- You want to validate output at each step before continuing
- Stages can be independently tested and replaced
Avoid when:
- Stages can be parallelized (use supervisor with parallel workers)
- Steps depend on each other in complex ways (use graph-based control flow like LangGraph)
Typed Interfaces Between Stages
The key to a robust pipeline is typed boundaries. Each stage has a defined input and output schema. If a stage fails to produce the expected output, the pipeline stops with a clear error — not silent garbage propagating downstream.
# pharmabot/pipeline/schemas.py
from pydantic import BaseModel, Field
class ResearchOutput(BaseModel):
topic: str
key_facts: list[str]
sources: list[str]
gaps: list[str] = Field(default_factory=list)
class AnalysisOutput(BaseModel):
topic: str
risk_level: str # "low" | "medium" | "high"
key_findings: list[str]
contraindications: list[str]
confidence: float
class ContentOutput(BaseModel):
title: str
summary: str
full_content: str
citations: list[str]Building the Pipeline
# pharmabot/pipeline/drug_content_pipeline.py
from openai import AsyncAzureOpenAI
from pharmabot.pipeline.schemas import ResearchOutput, AnalysisOutput, ContentOutput
import structlog
log = structlog.get_logger()
class DrugContentPipeline:
"""
3-stage pipeline for generating drug information content:
1. Research: gather facts about the drug
2. Analyze: assess clinical significance
3. Write: produce patient-facing content
"""
def __init__(self, client: AsyncAzureOpenAI):
self.client = client
async def _call_stage(
self,
stage_name: str,
system_prompt: str,
user_content: str,
output_schema: type,
):
"""Generic stage runner with structured output."""
log.info("pipeline_stage_start", stage=stage_name)
response = await self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
response_format={"type": "json_object"},
temperature=0.2,
)
raw = response.choices[0].message.content
result = output_schema.model_validate_json(raw)
log.info("pipeline_stage_complete", stage=stage_name)
return result
async def research(self, drug_name: str) -> ResearchOutput:
return await self._call_stage(
stage_name="research",
system_prompt="""You are a pharmaceutical researcher.
Extract factual information about the given drug.
Return JSON matching ResearchOutput schema:
{topic, key_facts (list), sources (list), gaps (list)}""",
user_content=f"Research the drug: {drug_name}",
output_schema=ResearchOutput,
)
async def analyze(self, research: ResearchOutput) -> AnalysisOutput:
return await self._call_stage(
stage_name="analyze",
system_prompt="""You are a clinical safety analyst.
Analyze the research findings for clinical significance.
Return JSON matching AnalysisOutput schema:
{topic, risk_level, key_findings (list), contraindications (list), confidence (0-1)}""",
user_content=(
f"Analyze this research:\n"
f"Topic: {research.topic}\n"
f"Facts: {research.key_facts}\n"
f"Gaps: {research.gaps}"
),
output_schema=AnalysisOutput,
)
async def write(
self,
research: ResearchOutput,
analysis: AnalysisOutput,
) -> ContentOutput:
return await self._call_stage(
stage_name="write",
system_prompt="""You are a medical content writer for a patient-facing app.
Write clear, accurate, safe content about the drug.
Always include safety warnings. Never give dosage without mentioning to consult a pharmacist.
Return JSON matching ContentOutput schema:
{title, summary, full_content, citations (list)}""",
user_content=(
f"Write patient-facing content about: {research.topic}\n\n"
f"Key facts: {research.key_facts}\n"
f"Risk level: {analysis.risk_level}\n"
f"Key findings: {analysis.key_findings}\n"
f"Contraindications: {analysis.contraindications}"
),
output_schema=ContentOutput,
)
async def run(self, drug_name: str) -> ContentOutput:
"""Execute the full pipeline."""
log.info("pipeline_start", drug=drug_name)
# Stage 1
research_result = await self.research(drug_name)
# Stage 2 — receives Stage 1 output
analysis_result = await self.analyze(research_result)
# Stage 3 — receives Stage 1 and 2 outputs
content_result = await self.write(research_result, analysis_result)
log.info("pipeline_complete", drug=drug_name, title=content_result.title)
return content_resultError Propagation
When a stage fails, the error should be caught and handled — not silently pass garbage:
async def run_safe(self, drug_name: str) -> ContentOutput | None:
"""Pipeline with explicit error handling at each stage."""
try:
research_result = await self.research(drug_name)
except Exception as e:
log.error("pipeline_stage_failed", stage="research", error=str(e))
return None # or raise a PipelineError
try:
analysis_result = await self.analyze(research_result)
except Exception as e:
log.error("pipeline_stage_failed", stage="analyze", error=str(e))
# Fallback: skip analysis, write from research only
return await self.write_from_research_only(research_result)
try:
return await self.write(research_result, analysis_result)
except Exception as e:
log.error("pipeline_stage_failed", stage="write", error=str(e))
return NoneTesting Pipeline Stages Independently
Because stages have typed interfaces, each can be unit-tested in isolation:
# tests/test_pipeline.py
import pytest
from pharmabot.pipeline.drug_content_pipeline import DrugContentPipeline
from pharmabot.pipeline.schemas import ResearchOutput
@pytest.mark.asyncio
async def test_analyze_stage(mock_client):
"""Test analyze stage in isolation with mock research data."""
pipeline = DrugContentPipeline(client=mock_client)
mock_research = ResearchOutput(
topic="Ibuprofen",
key_facts=["NSAID", "inhibits COX-1 and COX-2", "reduces inflammation"],
sources=["FDA label"],
gaps=["long-term safety in elderly"],
)
result = await pipeline.analyze(mock_research)
assert result.risk_level in ("low", "medium", "high")
assert isinstance(result.confidence, float)
assert 0.0 <= result.confidence <= 1.0
assert isinstance(result.key_findings, list)Pipeline vs Single Long Prompt
Why not just put all three stages in one giant prompt?
| | Single Prompt | Pipeline | |---|---|---| | Context usage | One call, but huge context | Three focused calls | | Intermediate validation | No checkpoints | Validate at each stage | | Debugging | Hard to isolate failures | Clear which stage failed | | Stage replacement | Rewrite everything | Swap one stage | | Token efficiency | Lower (all context in one call) | Higher (each stage focused) | | Latency | Lower (one call) | Higher (three serial calls) |
For simple tasks: use a single prompt. For complex multi-stage workflows where quality and debuggability matter: use the pipeline pattern.