Learnixo

Build PharmaBot AI · Lesson 7 of 13

Skill 6 — AI Agents: Build the Triage, Drug Info & Interaction Agents

The Multi-Agent Architecture

Instead of one large prompt that handles everything, PharmaBot uses three focused agents:

User: "Can I take ibuprofen with warfarin?"
        │
        ▼
  ┌─────────────────┐
  │  Triage Agent   │  ← classifies intent
  │  "interaction"  │
  └────────┬────────┘
           │
           ▼
  ┌─────────────────────┐
  │ Interaction Checker │  ← retrieves interaction data, scores severity
  │      Agent          │
  └─────────────────────┘
           │
           ▼
  Structured JSON response with severity badge

Why multiple agents?

  • Each agent has a smaller, focused system prompt → more reliable outputs
  • Specialist prompts outperform general prompts on specific tasks
  • You can update the interaction checker without touching the drug info agent
  • Easier to test and debug — each agent is independently unit-testable

Base Agent

Python
# pharmabot/agents/base.py
from abc import ABC, abstractmethod
from pharmabot.prompts.system import SYSTEM_PROMPT
from pharmabot.prompts.disclaimer import inject_disclaimer
from pharmabot.rag.pipeline import RAGPipeline
from openai import AsyncAzureOpenAI
from pharmabot.config import settings

class BaseAgent(ABC):
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.rag = RAGPipeline()
        self.client = AsyncAzureOpenAI(
            api_key=settings.azure_openai_api_key,
            azure_endpoint=settings.azure_openai_endpoint,
            api_version="2024-02-01",
        )

    @abstractmethod
    def build_prompt(self, question: str, context: str) -> str:
        """Each agent defines its own task-specific prompt."""
        ...

    async def stream(self, question: str):
        # 1. Retrieve relevant drug data
        chunks = await self.rag.retrieve(question)
        context = self.rag.format_context(chunks)

        # 2. Build task-specific prompt
        prompt = self.build_prompt(question, context)

        # 3. Stream from Azure OpenAI
        response_text = ""
        async with self.client.chat.completions.stream(
            model=settings.azure_openai_deployment,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user",   "content": prompt},
            ],
            temperature=0.1,    # low temperature for factual medical answers
            max_tokens=800,
        ) as stream:
            async for chunk in stream:
                if chunk.choices and chunk.choices[0].delta.content:
                    token = chunk.choices[0].delta.content
                    response_text += token
                    yield token

        # 4. Inject disclaimer if model forgot
        if "consult" not in response_text.lower():
            yield inject_disclaimer("")

Triage Agent

Python
# pharmabot/agents/triage.py
from pharmabot.agents.drug_info import DrugInfoAgent
from pharmabot.agents.interaction import InteractionAgent
from openai import AsyncAzureOpenAI
from pharmabot.config import settings

TRIAGE_PROMPT = """
Classify this pharmaceutical question into exactly one category.

Question: {question}

Categories:
- "interaction": asks about combining two or more drugs
- "drug_info": asks about a single drug (side effects, dosage, what it treats)
- "off_topic": not about medications

Reply with ONLY the category name, nothing else.
""".strip()

class TriageAgent:
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.client = AsyncAzureOpenAI(
            api_key=settings.azure_openai_api_key,
            azure_endpoint=settings.azure_openai_endpoint,
            api_version="2024-02-01",
        )

    async def _classify(self, question: str) -> str:
        response = await self.client.chat.completions.create(
            model=settings.azure_openai_deployment,
            messages=[{"role": "user", "content": TRIAGE_PROMPT.format(question=question)}],
            temperature=0,
            max_tokens=10,
        )
        return response.choices[0].message.content.strip().lower()

    async def stream(self, question: str):
        intent = await self._classify(question)

        if intent == "interaction":
            agent = InteractionAgent(self.session_id)
        elif intent == "drug_info":
            agent = DrugInfoAgent(self.session_id)
        else:
            yield "I can only help with medication questions. Please ask about drug information or interactions."
            return

        async for token in agent.stream(question):
            yield token

Drug Info Agent

Python
# pharmabot/agents/drug_info.py
from pharmabot.agents.base import BaseAgent
from pharmabot.prompts.drug_info import build_drug_info_prompt

class DrugInfoAgent(BaseAgent):
    def build_prompt(self, question: str, context: str) -> str:
        return build_drug_info_prompt(question=question, context=context)

That's it — 4 lines. The base class handles RAG retrieval and streaming. The specialist agent only defines what question to answer (via its prompt template).


Interaction Checker Agent

Python
# pharmabot/agents/interaction.py
from pharmabot.agents.base import BaseAgent
from pharmabot.prompts.interaction import INTERACTION_TEMPLATE

class InteractionAgent(BaseAgent):
    def build_prompt(self, question: str, context: str) -> str:
        return INTERACTION_TEMPLATE.substitute(question=question, context=context)

    async def stream(self, question: str):
        # Interaction responses must be valid JSON  validate before streaming
        full_response = ""
        async for token in super().stream(question):
            full_response += token

        # Validate and re-stream as parsed JSON
        try:
            import json
            from pharmabot.schemas.interaction import InteractionResult
            data = json.loads(full_response)
            result = InteractionResult(**data)
            yield result.model_dump_json()
        except Exception:
            # Fallback: stream raw text if JSON parsing fails
            yield full_response

Testing Each Agent Independently

Python
# tests/test_agents.py
import pytest

async def test_triage_routes_drug_info():
    agent = TriageAgent(session_id="test")
    intent = await agent._classify("What are the side effects of metformin?")
    assert intent == "drug_info"

async def test_triage_routes_interaction():
    agent = TriageAgent(session_id="test")
    intent = await agent._classify("Can I take ibuprofen with warfarin?")
    assert intent == "interaction"

async def test_triage_blocks_off_topic():
    agent = TriageAgent(session_id="test")
    response = ""
    async for token in agent.stream("Who won the World Cup?"):
        response += token
    assert "only help with medication" in response.lower()

async def test_interaction_returns_valid_json():
    agent = InteractionAgent(session_id="test")
    response = ""
    async for token in agent.stream("Can I take ibuprofen with warfarin?"):
        response += token
    import json
    data = json.loads(response)
    assert "severity" in data
    assert data["severity"] in ["mild", "moderate", "severe", "unknown"]

Checkpoint

Run the full agent pipeline with a drug interaction query:

Bash
# This should trigger: Triage  InteractionAgent  structured JSON
curl -N http://localhost:8000/api/chat \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{"message": "Is it safe to take aspirin and warfarin together?", "session_id": "agent-test"}'

The response should be a JSON object with severity, mechanism, clinical_effect, and disclaimer — not free text. Watch the Triage Agent's classification appear in the structured logs (structlog output in the terminal).