Learnixo
Back to blog
AI Systemsintermediate

Pipeline Multi-Agent Pattern

A linear chain where each agent processes the output of the previous one. Build typed, fault-tolerant pipelines with Pydantic interfaces between stages.

Asma Hafeez KhanMay 16, 20265 min read
AI AgentsMulti-AgentPipeline PatternAgentic AI
Share:š•

What Is the Pipeline Pattern?

In the pipeline pattern, agents are arranged in a linear sequence. Each agent receives the output of the previous agent, transforms it, and passes it to the next. The data flows in one direction: no cycles, no branching.

Input → [Agent 1] → [Agent 2] → [Agent 3] → Output
          Research    Analyze     Write

This is the simplest multi-agent topology. It maps naturally to workflows that have clear, ordered stages.


When to Use Pipelines

Use when:

  • The task has clear sequential stages (research → outline → write → review)
  • Each stage requires different specialization or a different context
  • You want to validate output at each step before continuing
  • Stages can be independently tested and replaced

Avoid when:

  • Stages can be parallelized (use supervisor with parallel workers)
  • Steps depend on each other in complex ways (use graph-based control flow like LangGraph)

Typed Interfaces Between Stages

The key to a robust pipeline is typed boundaries. Each stage has a defined input and output schema. If a stage fails to produce the expected output, the pipeline stops with a clear error — not silent garbage propagating downstream.

Python
# pharmabot/pipeline/schemas.py
from pydantic import BaseModel, Field

class ResearchOutput(BaseModel):
    topic: str
    key_facts: list[str]
    sources: list[str]
    gaps: list[str] = Field(default_factory=list)

class AnalysisOutput(BaseModel):
    topic: str
    risk_level: str  # "low" | "medium" | "high"
    key_findings: list[str]
    contraindications: list[str]
    confidence: float

class ContentOutput(BaseModel):
    title: str
    summary: str
    full_content: str
    citations: list[str]

Building the Pipeline

Python
# pharmabot/pipeline/drug_content_pipeline.py
from openai import AsyncAzureOpenAI
from pharmabot.pipeline.schemas import ResearchOutput, AnalysisOutput, ContentOutput
import structlog

log = structlog.get_logger()

class DrugContentPipeline:
    """
    3-stage pipeline for generating drug information content:
    1. Research: gather facts about the drug
    2. Analyze: assess clinical significance
    3. Write: produce patient-facing content
    """

    def __init__(self, client: AsyncAzureOpenAI):
        self.client = client

    async def _call_stage(
        self,
        stage_name: str,
        system_prompt: str,
        user_content: str,
        output_schema: type,
    ):
        """Generic stage runner with structured output."""
        log.info("pipeline_stage_start", stage=stage_name)

        response = await self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_content},
            ],
            response_format={"type": "json_object"},
            temperature=0.2,
        )

        raw = response.choices[0].message.content
        result = output_schema.model_validate_json(raw)

        log.info("pipeline_stage_complete", stage=stage_name)
        return result

    async def research(self, drug_name: str) -> ResearchOutput:
        return await self._call_stage(
            stage_name="research",
            system_prompt="""You are a pharmaceutical researcher.
            Extract factual information about the given drug.
            Return JSON matching ResearchOutput schema:
            {topic, key_facts (list), sources (list), gaps (list)}""",
            user_content=f"Research the drug: {drug_name}",
            output_schema=ResearchOutput,
        )

    async def analyze(self, research: ResearchOutput) -> AnalysisOutput:
        return await self._call_stage(
            stage_name="analyze",
            system_prompt="""You are a clinical safety analyst.
            Analyze the research findings for clinical significance.
            Return JSON matching AnalysisOutput schema:
            {topic, risk_level, key_findings (list), contraindications (list), confidence (0-1)}""",
            user_content=(
                f"Analyze this research:\n"
                f"Topic: {research.topic}\n"
                f"Facts: {research.key_facts}\n"
                f"Gaps: {research.gaps}"
            ),
            output_schema=AnalysisOutput,
        )

    async def write(
        self,
        research: ResearchOutput,
        analysis: AnalysisOutput,
    ) -> ContentOutput:
        return await self._call_stage(
            stage_name="write",
            system_prompt="""You are a medical content writer for a patient-facing app.
            Write clear, accurate, safe content about the drug.
            Always include safety warnings. Never give dosage without mentioning to consult a pharmacist.
            Return JSON matching ContentOutput schema:
            {title, summary, full_content, citations (list)}""",
            user_content=(
                f"Write patient-facing content about: {research.topic}\n\n"
                f"Key facts: {research.key_facts}\n"
                f"Risk level: {analysis.risk_level}\n"
                f"Key findings: {analysis.key_findings}\n"
                f"Contraindications: {analysis.contraindications}"
            ),
            output_schema=ContentOutput,
        )

    async def run(self, drug_name: str) -> ContentOutput:
        """Execute the full pipeline."""
        log.info("pipeline_start", drug=drug_name)

        # Stage 1
        research_result = await self.research(drug_name)

        # Stage 2 — receives Stage 1 output
        analysis_result = await self.analyze(research_result)

        # Stage 3 — receives Stage 1 and 2 outputs
        content_result = await self.write(research_result, analysis_result)

        log.info("pipeline_complete", drug=drug_name, title=content_result.title)
        return content_result

Error Propagation

When a stage fails, the error should be caught and handled — not silently pass garbage:

Python
async def run_safe(self, drug_name: str) -> ContentOutput | None:
    """Pipeline with explicit error handling at each stage."""
    try:
        research_result = await self.research(drug_name)
    except Exception as e:
        log.error("pipeline_stage_failed", stage="research", error=str(e))
        return None  # or raise a PipelineError

    try:
        analysis_result = await self.analyze(research_result)
    except Exception as e:
        log.error("pipeline_stage_failed", stage="analyze", error=str(e))
        # Fallback: skip analysis, write from research only
        return await self.write_from_research_only(research_result)

    try:
        return await self.write(research_result, analysis_result)
    except Exception as e:
        log.error("pipeline_stage_failed", stage="write", error=str(e))
        return None

Testing Pipeline Stages Independently

Because stages have typed interfaces, each can be unit-tested in isolation:

Python
# tests/test_pipeline.py
import pytest
from pharmabot.pipeline.drug_content_pipeline import DrugContentPipeline
from pharmabot.pipeline.schemas import ResearchOutput

@pytest.mark.asyncio
async def test_analyze_stage(mock_client):
    """Test analyze stage in isolation with mock research data."""
    pipeline = DrugContentPipeline(client=mock_client)

    mock_research = ResearchOutput(
        topic="Ibuprofen",
        key_facts=["NSAID", "inhibits COX-1 and COX-2", "reduces inflammation"],
        sources=["FDA label"],
        gaps=["long-term safety in elderly"],
    )

    result = await pipeline.analyze(mock_research)

    assert result.risk_level in ("low", "medium", "high")
    assert isinstance(result.confidence, float)
    assert 0.0 <= result.confidence <= 1.0
    assert isinstance(result.key_findings, list)

Pipeline vs Single Long Prompt

Why not just put all three stages in one giant prompt?

| | Single Prompt | Pipeline | |---|---|---| | Context usage | One call, but huge context | Three focused calls | | Intermediate validation | No checkpoints | Validate at each stage | | Debugging | Hard to isolate failures | Clear which stage failed | | Stage replacement | Rewrite everything | Swap one stage | | Token efficiency | Lower (all context in one call) | Higher (each stage focused) | | Latency | Lower (one call) | Higher (three serial calls) |

For simple tasks: use a single prompt. For complex multi-stage workflows where quality and debuggability matter: use the pipeline pattern.

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.