Learnixo

CrewAI Multi-Agents · Lesson 16 of 16

Interview: CrewAI Production Scenarios

Q1: When would you use Process.hierarchical instead of Process.sequential?

A: Use hierarchical when the workflow needs a manager to make dynamic decisions about task delegation and quality. Use sequential when the pipeline steps are fixed and well-defined.

Sequential is the right default: predictable, faster, cheaper. Each task runs in the order specified. No extra LLM calls for management.

Hierarchical adds value when:

  • Task assignment needs judgment (different agents may be appropriate depending on earlier outputs)
  • Quality review is needed before accepting a task's output
  • The workflow is open-ended and the number of steps isn't known upfront

Hierarchical roughly doubles LLM call count — a manager call per task assignment plus a review call after each completion. Don't pay that cost unless you need the dynamic routing.


Q2: What is async_execution=True and when should you use it?

A: async_execution=True on a Task tells CrewAI to run that task concurrently with other async tasks, rather than waiting for the previous task to finish.

When to use it: Only when tasks are truly independent — neither depends on the other's output.

Python
# These three can run in parallel  no dependencies between them
research_a = Task(description="Research Drug A", ..., async_execution=True)
research_b = Task(description="Research Drug B", ..., async_execution=True)
safety_check = Task(description="Regulatory check", ..., async_execution=True)

# This one must wait for all three  synchronous, uses context
synthesis = Task(
    description="Compare Drug A and Drug B",
    context=[research_a, research_b, safety_check],
    # async_execution=False (default)
)

Three 10-second research tasks run in ~10 seconds total instead of ~30 seconds. Don't use it when tasks share state or have data dependencies — the ordering won't be guaranteed.


Q3: How do you pass output from one task to another in CrewAI?

A: Two mechanisms:

Implicit (sequential mode): Each task's output is appended to the agent's conversation context. The next agent sees previous outputs automatically.

Explicit (recommended): Use context=[task] to specifically inject a task's output into the next task's prompt.

Python
write_task = Task(
    description="Write a leaflet based on the research",
    agent=writer,
    context=[research_task],  # Explicit: receives research_task output
)

review_task = Task(
    description="Review the leaflet against original research",
    agent=reviewer,
    context=[research_task, write_task],  # Both outputs injected
)

Explicit context is clearer and avoids context pollution from unrelated earlier tasks. Always use it for multi-task pipelines where dependencies need to be visible.


Q4: How do you get structured (typed) output from a CrewAI task?

A: Use output_pydantic on the task with a Pydantic model:

Python
from pydantic import BaseModel

class DrugReport(BaseModel):
    drug_name: str
    severity: str
    mechanism: str
    recommendation: str

task = Task(
    description="Analyze the interaction between {drug_a} and {drug_b}",
    expected_output="Complete drug interaction report with severity, mechanism, and recommendation",
    agent=pharmacologist,
    output_pydantic=DrugReport,
)

result = crew.kickoff(inputs={"drug_a": "Warfarin", "drug_b": "Ibuprofen"})
report: DrugReport = result.pydantic
print(report.severity)

If the model fails to produce valid JSON, result.pydantic is None and result.raw contains the unstructured text. Always handle the None case in production.


Q5: How do you control costs when running CrewAI at scale?

A: Several levers:

Model selection: Use GPT-4o-mini or Haiku for tasks that don't need frontier-model reasoning. Reserve GPT-4o / Claude Opus for complex analysis.

Python
agent = Agent(
    role="Drug Researcher",
    llm=ChatOpenAI(model="gpt-4o-mini"),  # Cheaper for structured extraction
    ...
)

Token monitoring: Check result.token_usage after every run. Track token cost per drug, per crew run.

Task scope: Vague tasks produce long outputs. Specific expected_output descriptions produce focused, shorter outputs.

Async batching: kickoff_for_each_async runs crews in parallel — same total cost, shorter wall time.

Caching: Cache crew outputs for repeated inputs. If you've already generated a leaflet for Metformin, don't regenerate it.


Q6: What happens when a CrewAI agent fails to produce valid output?

A: CrewAI has a built-in retry mechanism. If an agent doesn't produce output matching the expected format (especially with output_pydantic), it retries with corrective prompting.

After retries are exhausted:

  • task.output is None if structured output was required and failed
  • task.output.raw contains the raw text the agent produced

Handle this in production:

Python
result = crew.kickoff()

report = result.pydantic
if report is None:
    # Fallback: parse the raw text manually
    raw = result.raw
    logger.warning(f"Structured output failed — falling back to raw: {raw[:200]}")
    report = fallback_extract(raw)

To reduce failures: write very specific expected_output descriptions that name the exact fields required, and use description= on Pydantic fields to guide the model.


Q7: How do you add memory to CrewAI agents?

A: CrewAI supports several memory types:

Python
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    memory=True,          # Enable all memory types
    verbose=True,
)

Memory types CrewAI manages:

  • Short-term memory: Recent conversation context within a crew run
  • Long-term memory: Persists across crew runs (stored in a local vector store by default)
  • Entity memory: Tracks specific entities (drugs, people, companies) mentioned across runs

For pharmaceutical applications: long-term memory lets the crew "remember" that it already researched Metformin's mechanism in a previous run and not repeat work.

Custom memory storage (for production):

Python
from crewai.memory.storage.rag_storage import RAGStorage

crew = Crew(
    ...
    memory=True,
    long_term_memory=RAGStorage(
        embedder_config={"provider": "openai", "config": {"model": "text-embedding-3-small"}},
        storage_path="./crew_memory",
    ),
)

Q8: System design — design a CrewAI pipeline for automated drug monograph generation.

A: Requirements: given a drug name, produce a complete drug monograph with pharmacology, safety, and clinical use sections.

Crew design:

Inputs: drug_name
    │
    ├── [Async] Literature Researcher → pharmacology_task
    ├── [Async] Safety Analyst → safety_task
    └── [Async] Regulatory Specialist → regulatory_task
         │
         └── [Sync] Medical Writer → synthesis_task (context=[all three])
                  │
                  └── [Sync] Clinical Reviewer → review_task (context=[synthesis_task])

Three independent research tasks run in parallel (async_execution=True), reducing time from ~30s to ~10s. The writer synthesizes all three outputs. The reviewer checks the final draft for accuracy.

Output: output_pydantic=DrugMonograph on the final task gives a typed object you can store in a database.

Key decisions:

  • Use Process.sequential (the parallel work is handled by async_execution, not hierarchical)
  • Specific expected_output on research tasks prevents information loss
  • Reviewer has context=[literature_task, synthesis_task] — needs both original data and draft

Q9: How do you handle rate limits when running many crews in parallel?

A: Three approaches:

Semaphore limiting: Cap concurrent crew runs

Python
import asyncio

semaphore = asyncio.Semaphore(5)  # Max 5 concurrent crews

async def run_with_limit(drug_name: str):
    async with semaphore:
        result = await crew.kickoff_async(inputs={"drug_name": drug_name})
        return result

tasks = [run_with_limit(drug) for drug in drug_list]
results = await asyncio.gather(*tasks)

Exponential backoff: Retry on 429 errors with increasing delays.

Model-level rate limits: Each agent's llm has its own rate limit. Configure multiple API keys (LiteLLM load balancing) or use different models for different agents.

For large batch jobs: run kickoff_for_each with a smaller batch size, then process the next batch after the first completes.


Q10: What are the main limitations of CrewAI in production?

A:

Non-determinism: LLM outputs vary between runs. Two runs with identical inputs may produce different task assignments (hierarchical) and different output content.

Cost at scale: Each agent step is an LLM call. A 4-task crew with verbose agents may make 20+ LLM calls per run. At scale, this adds up quickly.

Context window limits: Long-running crews accumulate context. If research outputs are large and multiple tasks inject them, the synthesis task may exceed the model's context window.

Debugging difficulty: When a crew produces wrong output, tracing which agent made the error and why requires examining the full conversation logs. verbose=True helps but adds noise.

No built-in persistence: If a crew run crashes midway, there's no checkpoint to resume from. For long-running crews, implement your own checkpointing (save task outputs to a database and re-inject if restarting).

Mitigation: Structured output (output_pydantic) makes failures explicit. Specific expected_output reduces non-determinism. Token monitoring catches runaway costs early.