Learnixo
Back to blog
AI Systemsintermediate

Running and Monitoring a Crew

Call kickoff(), kickoff_async(), and kickoff_for_each() to run CrewAI crews. Use callbacks to monitor task and step progress in real time.

Asma Hafeez KhanMay 16, 20265 min read
CrewAIPythonMulti-AgentMonitoring
Share:š•

Running a Crew

After building agents and tasks, you run the crew with one of three methods:

| Method | When to Use | |---|---| | kickoff() | Synchronous, single run | | kickoff_async() | Async, single run | | kickoff_for_each() | Run same crew for a list of inputs | | kickoff_for_each_async() | Parallel runs for a list of inputs |


kickoff() — Synchronous Single Run

The standard entry point for most use cases:

Python
from crewai import Agent, Task, Crew, Process

researcher = Agent(
    role="Drug Researcher",
    goal="Research pharmaceutical compounds",
    backstory="Expert pharmacologist",
    verbose=True,
)

research_task = Task(
    description="Research {drug_name}: mechanism, indications, side effects, interactions",
    expected_output="Comprehensive drug research report with all clinical data",
    agent=researcher,
)

crew = Crew(
    agents=[researcher],
    tasks=[research_task],
    process=Process.sequential,
    verbose=True,
)

# Synchronous run with input variables
result = crew.kickoff(inputs={"drug_name": "Metformin"})

# Access results
print(result.raw)          # Final output as string
print(result.pydantic)     # Typed model (if output_pydantic was set)
print(result.json_dict)    # Dict (if output_json was set)
print(result.token_usage)  # Token counts across all tasks

kickoff_async() — Asynchronous Run

Use when your application is already async (FastAPI, etc.):

Python
import asyncio
from crewai import Crew

async def analyze_drug(drug_name: str) -> str:
    crew = Crew(
        agents=[researcher, writer],
        tasks=[research_task, write_task],
        process=Process.sequential,
    )
    result = await crew.kickoff_async(inputs={"drug_name": drug_name})
    return result.raw

# Run in FastAPI endpoint
@app.post("/analyze")
async def analyze(request: AnalyzeRequest):
    report = await analyze_drug(request.drug_name)
    return {"report": report}

# Run multiple crews concurrently
async def analyze_drug_class(drugs: list[str]):
    tasks = [analyze_drug(drug) for drug in drugs]
    results = await asyncio.gather(*tasks)
    return dict(zip(drugs, results))

kickoff_for_each() — Batch Processing

Run the same crew configuration against multiple inputs:

Python
# Define crew once
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential,
)

# Run for each drug
inputs_list = [
    {"drug_name": "Metformin"},
    {"drug_name": "Atorvastatin"},
    {"drug_name": "Lisinopril"},
    {"drug_name": "Omeprazole"},
]

# Sequential — one at a time
results = crew.kickoff_for_each(inputs=inputs_list)

# Parallel — all concurrent
results = crew.kickoff_for_each_async(inputs=inputs_list)

# Results is a list aligned with inputs_list
for drug_input, result in zip(inputs_list, results):
    print(f"{drug_input['drug_name']}: {result.raw[:100]}")

kickoff_for_each_async is significantly faster when you have many independent inputs.


Monitoring with Callbacks

CrewAI supports two callback hooks for real-time monitoring:

step_callback

Called after each agent step (thought, action, observation):

Python
from crewai.agents.agent_builder.utilities.output_converter import ConverterError

def on_step(step_output):
    """Called after each agent reasoning step."""
    print(f"[STEP] Agent reasoning: {str(step_output)[:200]}")

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential,
    step_callback=on_step,
)

task_callback

Called when each task completes:

Python
from crewai.tasks.task_output import TaskOutput

def on_task_complete(task_output: TaskOutput):
    """Called when each task finishes."""
    print(f"\n[TASK COMPLETE]")
    print(f"  Output (first 200 chars): {task_output.raw[:200]}")

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential,
    task_callback=on_task_complete,
)

Production Monitoring Pattern

A complete monitoring setup for a production pharmaceutical content pipeline:

Python
import logging
import time
from crewai import Agent, Task, Crew, Process
from crewai.tasks.task_output import TaskOutput

logger = logging.getLogger(__name__)

class DrugResearchMonitor:
    def __init__(self, drug_name: str):
        self.drug_name = drug_name
        self.start_time = time.time()
        self.task_times: list[float] = []
        self.task_count = 0

    def on_step(self, step_output):
        logger.debug(f"[{self.drug_name}] Step: {str(step_output)[:100]}")

    def on_task_complete(self, task_output: TaskOutput):
        elapsed = time.time() - self.start_time
        self.task_count += 1
        self.task_times.append(elapsed)
        logger.info(
            f"[{self.drug_name}] Task {self.task_count} completed "
            f"at {elapsed:.1f}s | output: {len(task_output.raw)} chars"
        )

def run_drug_research(drug_name: str) -> str:
    monitor = DrugResearchMonitor(drug_name)

    researcher = Agent(
        role="Drug Researcher",
        goal="Research pharmaceutical compounds accurately",
        backstory="Expert clinical pharmacologist",
        verbose=False,  # Suppress default verbose for production
    )

    writer = Agent(
        role="Medical Writer",
        goal="Write patient-friendly drug information",
        backstory="Medical writer specializing in patient education",
        verbose=False,
    )

    research_task = Task(
        description=f"Research {drug_name}: mechanism, indications, side effects, interactions",
        expected_output="Complete pharmacological report with all clinical data fields",
        agent=researcher,
    )

    write_task = Task(
        description=f"Write a patient information leaflet for {drug_name}",
        expected_output="500-word patient leaflet covering purpose, usage, side effects, warnings",
        agent=writer,
        context=[research_task],
    )

    crew = Crew(
        agents=[researcher, writer],
        tasks=[research_task, write_task],
        process=Process.sequential,
        step_callback=monitor.on_step,
        task_callback=monitor.on_task_complete,
    )

    result = crew.kickoff()

    total_time = time.time() - monitor.start_time
    logger.info(
        f"[{drug_name}] Crew complete | "
        f"total: {total_time:.1f}s | "
        f"tokens: {result.token_usage}"
    )

    return result.raw

Accessing Full Results

After kickoff(), results are available at both crew and task level:

Python
result = crew.kickoff(inputs={"drug_name": "Warfarin"})

# --- Crew-level (final task output) ---
print(result.raw)           # String output
print(result.pydantic)      # Typed model if output_pydantic set on last task
print(result.json_dict)     # Dict if output_json set on last task
print(result.token_usage)   # CrewTokenUsage object

# Token breakdown
usage = result.token_usage
print(f"Total tokens: {usage.total_tokens}")
print(f"Prompt tokens: {usage.prompt_tokens}")
print(f"Completion tokens: {usage.completion_tokens}")

# --- Task-level (any task in the crew) ---
for task in crew.tasks:
    output = task.output
    print(f"\nTask: {task.description[:60]}")
    print(f"  Raw output: {output.raw[:200]}")
    if output.pydantic:
        print(f"  Typed: {output.pydantic}")
    if output.json_dict:
        print(f"  Dict: {output.json_dict}")

Error Handling

Python
from crewai.utilities.exceptions.context_window_exceeding_exception import (
    LLMContextLengthExceededException,
)

try:
    result = crew.kickoff(inputs={"drug_name": "Warfarin"})
except LLMContextLengthExceededException:
    # Context window exceeded — reduce task scope or split into smaller tasks
    logger.error("Context window exceeded — task descriptions may be too large")
    raise
except Exception as e:
    logger.error(f"Crew execution failed: {e}")
    raise

If a task fails after all retries, task.output will be None. Check before accessing:

Python
result = crew.kickoff()
for task in crew.tasks:
    if task.output is None:
        print(f"Task failed: {task.description[:60]}")
    else:
        print(f"Task succeeded: {task.output.raw[:100]}")

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.