CrewAI Multi-Agents · Lesson 15 of 16
Crew Kickoff: Running the Full Pipeline
Running a Crew
After building agents and tasks, you run the crew with one of three methods:
| Method | When to Use |
|---|---|
| kickoff() | Synchronous, single run |
| kickoff_async() | Async, single run |
| kickoff_for_each() | Run same crew for a list of inputs |
| kickoff_for_each_async() | Parallel runs for a list of inputs |
kickoff() — Synchronous Single Run
The standard entry point for most use cases:
from crewai import Agent, Task, Crew, Process
researcher = Agent(
role="Drug Researcher",
goal="Research pharmaceutical compounds",
backstory="Expert pharmacologist",
verbose=True,
)
research_task = Task(
description="Research {drug_name}: mechanism, indications, side effects, interactions",
expected_output="Comprehensive drug research report with all clinical data",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[research_task],
process=Process.sequential,
verbose=True,
)
# Synchronous run with input variables
result = crew.kickoff(inputs={"drug_name": "Metformin"})
# Access results
print(result.raw) # Final output as string
print(result.pydantic) # Typed model (if output_pydantic was set)
print(result.json_dict) # Dict (if output_json was set)
print(result.token_usage) # Token counts across all taskskickoff_async() — Asynchronous Run
Use when your application is already async (FastAPI, etc.):
import asyncio
from crewai import Crew
async def analyze_drug(drug_name: str) -> str:
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
)
result = await crew.kickoff_async(inputs={"drug_name": drug_name})
return result.raw
# Run in FastAPI endpoint
@app.post("/analyze")
async def analyze(request: AnalyzeRequest):
report = await analyze_drug(request.drug_name)
return {"report": report}
# Run multiple crews concurrently
async def analyze_drug_class(drugs: list[str]):
tasks = [analyze_drug(drug) for drug in drugs]
results = await asyncio.gather(*tasks)
return dict(zip(drugs, results))kickoff_for_each() — Batch Processing
Run the same crew configuration against multiple inputs:
# Define crew once
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
)
# Run for each drug
inputs_list = [
{"drug_name": "Metformin"},
{"drug_name": "Atorvastatin"},
{"drug_name": "Lisinopril"},
{"drug_name": "Omeprazole"},
]
# Sequential — one at a time
results = crew.kickoff_for_each(inputs=inputs_list)
# Parallel — all concurrent
results = crew.kickoff_for_each_async(inputs=inputs_list)
# Results is a list aligned with inputs_list
for drug_input, result in zip(inputs_list, results):
print(f"{drug_input['drug_name']}: {result.raw[:100]}")kickoff_for_each_async is significantly faster when you have many independent inputs.
Monitoring with Callbacks
CrewAI supports two callback hooks for real-time monitoring:
step_callback
Called after each agent step (thought, action, observation):
from crewai.agents.agent_builder.utilities.output_converter import ConverterError
def on_step(step_output):
"""Called after each agent reasoning step."""
print(f"[STEP] Agent reasoning: {str(step_output)[:200]}")
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
step_callback=on_step,
)task_callback
Called when each task completes:
from crewai.tasks.task_output import TaskOutput
def on_task_complete(task_output: TaskOutput):
"""Called when each task finishes."""
print(f"\n[TASK COMPLETE]")
print(f" Output (first 200 chars): {task_output.raw[:200]}")
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
task_callback=on_task_complete,
)Production Monitoring Pattern
A complete monitoring setup for a production pharmaceutical content pipeline:
import logging
import time
from crewai import Agent, Task, Crew, Process
from crewai.tasks.task_output import TaskOutput
logger = logging.getLogger(__name__)
class DrugResearchMonitor:
def __init__(self, drug_name: str):
self.drug_name = drug_name
self.start_time = time.time()
self.task_times: list[float] = []
self.task_count = 0
def on_step(self, step_output):
logger.debug(f"[{self.drug_name}] Step: {str(step_output)[:100]}")
def on_task_complete(self, task_output: TaskOutput):
elapsed = time.time() - self.start_time
self.task_count += 1
self.task_times.append(elapsed)
logger.info(
f"[{self.drug_name}] Task {self.task_count} completed "
f"at {elapsed:.1f}s | output: {len(task_output.raw)} chars"
)
def run_drug_research(drug_name: str) -> str:
monitor = DrugResearchMonitor(drug_name)
researcher = Agent(
role="Drug Researcher",
goal="Research pharmaceutical compounds accurately",
backstory="Expert clinical pharmacologist",
verbose=False, # Suppress default verbose for production
)
writer = Agent(
role="Medical Writer",
goal="Write patient-friendly drug information",
backstory="Medical writer specializing in patient education",
verbose=False,
)
research_task = Task(
description=f"Research {drug_name}: mechanism, indications, side effects, interactions",
expected_output="Complete pharmacological report with all clinical data fields",
agent=researcher,
)
write_task = Task(
description=f"Write a patient information leaflet for {drug_name}",
expected_output="500-word patient leaflet covering purpose, usage, side effects, warnings",
agent=writer,
context=[research_task],
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
step_callback=monitor.on_step,
task_callback=monitor.on_task_complete,
)
result = crew.kickoff()
total_time = time.time() - monitor.start_time
logger.info(
f"[{drug_name}] Crew complete | "
f"total: {total_time:.1f}s | "
f"tokens: {result.token_usage}"
)
return result.rawAccessing Full Results
After kickoff(), results are available at both crew and task level:
result = crew.kickoff(inputs={"drug_name": "Warfarin"})
# --- Crew-level (final task output) ---
print(result.raw) # String output
print(result.pydantic) # Typed model if output_pydantic set on last task
print(result.json_dict) # Dict if output_json set on last task
print(result.token_usage) # CrewTokenUsage object
# Token breakdown
usage = result.token_usage
print(f"Total tokens: {usage.total_tokens}")
print(f"Prompt tokens: {usage.prompt_tokens}")
print(f"Completion tokens: {usage.completion_tokens}")
# --- Task-level (any task in the crew) ---
for task in crew.tasks:
output = task.output
print(f"\nTask: {task.description[:60]}")
print(f" Raw output: {output.raw[:200]}")
if output.pydantic:
print(f" Typed: {output.pydantic}")
if output.json_dict:
print(f" Dict: {output.json_dict}")Error Handling
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
try:
result = crew.kickoff(inputs={"drug_name": "Warfarin"})
except LLMContextLengthExceededException:
# Context window exceeded — reduce task scope or split into smaller tasks
logger.error("Context window exceeded — task descriptions may be too large")
raise
except Exception as e:
logger.error(f"Crew execution failed: {e}")
raiseIf a task fails after all retries, task.output will be None. Check before accessing:
result = crew.kickoff()
for task in crew.tasks:
if task.output is None:
print(f"Task failed: {task.description[:60]}")
else:
print(f"Task succeeded: {task.output.raw[:100]}")