Learnixo
Back to blog
AI Systemsintermediate

Supervisor-Worker Multi-Agent Pattern

Build multi-agent systems where a supervisor delegates tasks to specialist worker agents — enabling parallelism, specialization, and cleaner separation of concerns.

Asma Hafeez KhanMay 15, 20268 min read
multi-agentsupervisorworkerdelegationagentic-aipython
Share:𝕏

Supervisor-Worker Multi-Agent Pattern

A single agent running a ReAct loop eventually hits a complexity ceiling. The loop gets long, the context fills with irrelevant tool calls, and the model loses track of sub-goals. The supervisor-worker pattern solves this by decomposing work: one supervisor agent orchestrates the task, and multiple specialized worker agents each handle a specific domain.

This is analogous to how software teams work: a tech lead defines the approach and delegates implementation to specialists. The supervisor knows the goal; the workers know their domain.


Architecture

User Request
     │
     ▼
┌──────────────────┐
│    Supervisor    │  — Breaks goal into tasks, assigns to workers,
│    Agent (LLM)   │    reads results, synthesizes final answer
└────────┬─────────┘
         │ delegates
    ┌────┴─────────────────────────┐
    │                              │
    ▼                              ▼
┌──────────┐                ┌──────────┐
│ Research │                │ Writer   │
│ Worker   │                │ Worker   │
│ (LLM)   │                │ (LLM)   │
└──────────┘                └──────────┘
    │ result                     │ result
    └────────────┬───────────────┘
                 ▼
           Supervisor synthesizes
                 │
                 ▼
           Final Answer

The supervisor sees the big picture. Workers are specialists who do not need to know about each other or the overall goal — they just receive a task and return a result.


Worker Agent Base Class

Python
import json
import openai
from dataclasses import dataclass
from typing import Optional, List, Dict, Any, Callable

client = openai.OpenAI()


@dataclass
class WorkerResult:
    """Structured result from a worker agent."""
    worker_name: str
    task: str
    result: str
    success: bool
    error: Optional[str] = None
    metadata: Dict[str, Any] = None

    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}


class WorkerAgent:
    """
    Base class for worker agents.
    Each worker has a name, a specialty, and a set of tools.
    """

    def __init__(
        self,
        name: str,
        specialty: str,
        system_prompt: str,
        tools: Optional[List[dict]] = None,
        tool_implementations: Optional[Dict[str, Callable]] = None,
        model: str = "gpt-4o-mini",
    ):
        self.name = name
        self.specialty = specialty
        self.system_prompt = system_prompt
        self.tools = tools or []
        self.tool_implementations = tool_implementations or {}
        self.model = model

    def run(self, task: str, max_iterations: int = 5) -> WorkerResult:
        """
        Execute a task and return a structured result.
        Runs a mini ReAct loop internally.
        """
        messages = [
            {"role": "system", "content": self.system_prompt},
            {
                "role": "user",
                "content": (
                    f"Task: {task}\n\n"
                    "Complete this task thoroughly. "
                    "Return a clear, complete result."
                ),
            },
        ]

        try:
            for _ in range(max_iterations):
                kwargs = {"model": self.model, "messages": messages}
                if self.tools:
                    kwargs["tools"] = self.tools
                    kwargs["tool_choice"] = "auto"

                response = client.chat.completions.create(**kwargs)
                msg = response.choices[0].message

                if not msg.tool_calls:
                    return WorkerResult(
                        worker_name=self.name,
                        task=task,
                        result=msg.content,
                        success=True,
                    )

                messages.append(msg)
                for tc in msg.tool_calls:
                    tool_name = tc.function.name
                    args = json.loads(tc.function.arguments)
                    fn = self.tool_implementations.get(tool_name)
                    result = fn(**args) if fn else f"Error: unknown tool {tool_name}"
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": str(result),
                    })

            return WorkerResult(
                worker_name=self.name,
                task=task,
                result="Max iterations reached without completing task.",
                success=False,
                error="max_iterations",
            )

        except Exception as e:
            return WorkerResult(
                worker_name=self.name,
                task=task,
                result="",
                success=False,
                error=str(e),
            )

Specialist Worker Implementations

Python
# Simulated tools for the research worker
def search_web(query: str) -> str:
    sources = {
        "vector databases 2024": (
            "Top vector databases in 2024: Pinecone (managed, easy to start), "
            "Weaviate (open-source, GraphQL API), pgvector (PostgreSQL extension, "
            "no extra infra), Qdrant (Rust-based, high performance). "
            "All support HNSW indexing for approximate nearest neighbor search."
        ),
        "vector database cost": (
            "Pinecone: $0.096/hour for p1.x1 pod. pgvector: cost of your Postgres "
            "instance only. Weaviate Cloud: $25/month starter. Qdrant Cloud: "
            "$25/month for 1GB. Self-hosted options have no licensing cost."
        ),
        "pgvector production": (
            "pgvector is used in production by many companies. Supabase, Neon, and "
            "RDS all support it. Supports up to 2000 dimensions. IVFFLAT and HNSW "
            "indexes available. Best for teams already on PostgreSQL."
        ),
    }
    query_lower = query.lower()
    for key, result in sources.items():
        if any(word in query_lower for word in key.split()):
            return result
    return f"Research result for '{query}': Limited information available."


RESEARCH_TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "search_web",
            "description": "Search for technical information, documentation, and research",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "Search query"}
                },
                "required": ["query"],
            },
        },
    }
]

research_worker = WorkerAgent(
    name="Researcher",
    specialty="Information gathering and technical research",
    system_prompt=(
        "You are a technical research specialist. Your job is to find accurate, "
        "up-to-date information using your search tools. Cite sources. "
        "Be objective — present both advantages and limitations. "
        "Return comprehensive findings, not opinions."
    ),
    tools=RESEARCH_TOOLS,
    tool_implementations={"search_web": search_web},
)

writer_worker = WorkerAgent(
    name="Writer",
    specialty="Technical writing and documentation",
    system_prompt=(
        "You are a technical writer. Given research findings or structured data, "
        "you write clear, well-organized technical content. "
        "Use plain English. Structure with headers where appropriate. "
        "Target audience: intermediate software engineers."
    ),
)

critic_worker = WorkerAgent(
    name="Critic",
    specialty="Review and quality assurance",
    system_prompt=(
        "You are a technical editor and fact-checker. Review content for: "
        "accuracy, completeness, clarity, and logical flow. "
        "Identify specific issues, do not just say 'good job'. "
        "Return a scored review (1-10) with specific improvement suggestions."
    ),
)

Supervisor Agent

Python
@dataclass
class SupervisorPlan:
    """A task decomposed by the supervisor."""
    goal: str
    tasks: List[Dict[str, str]]  # [{"worker": "Researcher", "task": "..."}]


class SupervisorAgent:
    """
    Orchestrates multiple worker agents to complete a complex goal.
    """

    def __init__(self, workers: Dict[str, WorkerAgent], model: str = "gpt-4o"):
        self.workers = workers
        self.model = model

    def plan(self, goal: str) -> SupervisorPlan:
        """
        Break the goal into tasks and assign each to the appropriate worker.
        """
        worker_descriptions = "\n".join(
            f"- {name}: {worker.specialty}"
            for name, worker in self.workers.items()
        )

        plan_prompt = f"""You are a task coordinator. Break the following goal into
specific tasks and assign each to the most appropriate worker.

Available workers:
{worker_descriptions}

Goal: {goal}

Return a JSON object with this structure:
{{
  "tasks": [
    {{"worker": "WorkerName", "task": "specific task description"}},
    ...
  ]
}}

Rules:
- Keep tasks focused and actionable
- Assign each task to the most appropriate worker
- Tasks should be sequential (each builds on previous results)
- Maximum 4 tasks
"""
        response = client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": plan_prompt}],
            response_format={"type": "json_object"},
            temperature=0,
        )
        data = json.loads(response.choices[0].message.content)
        return SupervisorPlan(goal=goal, tasks=data.get("tasks", []))

    def execute(self, plan: SupervisorPlan) -> List[WorkerResult]:
        """Execute all tasks in sequence, passing results to subsequent workers."""
        results: List[WorkerResult] = []
        accumulated_context = ""

        for i, task_spec in enumerate(plan.tasks):
            worker_name = task_spec["worker"]
            task_description = task_spec["task"]

            print(f"\n[Supervisor] Delegating to {worker_name}:")
            print(f"  Task: {task_description[:100]}...")

            worker = self.workers.get(worker_name)
            if not worker:
                print(f"  [Error] Unknown worker: {worker_name}")
                continue

            # Inject results from previous workers as context
            full_task = task_description
            if accumulated_context:
                full_task = (
                    f"Context from previous steps:\n{accumulated_context}\n\n"
                    f"Your task: {task_description}"
                )

            result = worker.run(full_task)
            results.append(result)

            if result.success:
                print(f"  [Done] {result.result[:100]}...")
                accumulated_context += f"\n\n[{worker_name} result]:\n{result.result}"
            else:
                print(f"  [Failed] {result.error}")

        return results

    def synthesize(self, goal: str, results: List[WorkerResult]) -> str:
        """Combine all worker results into a final answer."""
        results_text = "\n\n".join(
            f"=== {r.worker_name} ===\n{r.result}"
            for r in results
            if r.success
        )

        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": "You synthesize outputs from multiple specialist agents into a coherent final answer.",
                },
                {
                    "role": "user",
                    "content": (
                        f"Original goal: {goal}\n\n"
                        f"Worker outputs:\n{results_text}\n\n"
                        "Produce a final, comprehensive answer that addresses the original goal."
                    ),
                },
            ],
        )
        return response.choices[0].message.content

    def run(self, goal: str) -> str:
        """Full supervisor pipeline: plan → execute → synthesize."""
        print(f"\n{'='*60}")
        print(f"SUPERVISOR: Starting goal")
        print(f"Goal: {goal}")
        print('='*60)

        plan = self.plan(goal)
        print(f"\n[Supervisor] Plan: {len(plan.tasks)} tasks")
        for i, t in enumerate(plan.tasks):
            print(f"  {i+1}. [{t['worker']}] {t['task'][:80]}...")

        results = self.execute(plan)

        print(f"\n[Supervisor] Synthesizing results...")
        final = self.synthesize(goal, results)
        return final

Running the Full System

Python
if __name__ == "__main__":
    # Assemble the multi-agent system
    supervisor = SupervisorAgent(
        workers={
            "Researcher": research_worker,
            "Writer": writer_worker,
            "Critic": critic_worker,
        },
        model="gpt-4o",
    )

    # Run a complex task
    goal = (
        "Create a technical comparison of vector database options for a startup "
        "that is already using PostgreSQL. Include cost analysis, operational "
        "complexity, and a recommendation."
    )

    final_answer = supervisor.run(goal)
    print(f"\n{'='*60}")
    print("FINAL ANSWER:")
    print('='*60)
    print(final_answer)

Example execution trace:

============================================================
SUPERVISOR: Starting goal
Goal: Create a technical comparison of vector databases...
============================================================

[Supervisor] Plan: 3 tasks
  1. [Researcher] Research vector database options suitable for PostgreSQL users
  2. [Writer] Write a structured comparison based on research findings
  3. [Critic] Review the comparison for accuracy and completeness

[Supervisor] Delegating to Researcher:
  Task: Research vector database options...
  [Done] Top vector databases: Pinecone, Weaviate, pgvector, Qdrant...

[Supervisor] Delegating to Writer:
  Task: Write a structured comparison...
  [Done] # Vector Database Comparison for PostgreSQL Teams...

[Supervisor] Delegating to Critic:
  Task: Review the comparison...
  [Done] Score: 8/10. Strengths: clear structure, cost data...

When to Use Supervisor-Worker

Use supervisor-worker when:

  • Different parts of the task require genuinely different expertise
  • You want clear accountability — each worker produces an auditable output
  • You need quality gates (critic worker reviews before returning to user)
  • Tasks are long enough that a single agent would lose context

Do not use it when:

  • The task is simple enough for a single ReAct loop
  • You need sub-second latency (each delegation adds at least one LLM call)
  • Workers are not genuinely specialized — if they all do the same thing, the pattern adds complexity without value

Summary

  • The supervisor breaks the goal into tasks and assigns them to specialist workers
  • Workers are independent — they only see their task and results from previous workers
  • The supervisor synthesizes all worker outputs into a final coherent answer
  • Use different models for supervisor (expensive, smart) and workers (cheaper, focused)
  • Workers can have their own tool registries, enabling parallel specialization
  • The pattern scales: add workers without changing the supervisor

Next: Peer-to-Peer Multi-Agent Pattern — agents that communicate without a central coordinator.

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.