Chapter 3: Task Definition & Planning
April 13, 2026 · View on GitHub
Welcome to Chapter 3: Task Definition & Planning. In this part of CrewAI Tutorial: Building Collaborative AI Agent Teams, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Mastering task definition and planning is crucial for building effective AI crews. This chapter explores how to break down complex objectives into actionable tasks that agents can execute collaboratively.
Task Definition Framework
Task Structure
from crewai import Task
from typing import List, Dict, Any, Optional
class CrewTask(Task):
def __init__(self,
description: str,
agent: Agent,
expected_output: str,
context: Optional[List[Task]] = None,
dependencies: Optional[List[str]] = None,
priority: str = "medium",
complexity: str = "medium",
estimated_duration: int = 30):
super().__init__(
description=description,
agent=agent,
expected_output=expected_output,
context=context or []
)
self.dependencies = dependencies or []
self.priority = priority
self.complexity = complexity
self.estimated_duration = estimated_duration
self.actual_duration = None
self.status = "pending"
self.subtasks = []
Task Categories
task_categories = {
"research": {
"description": "Information gathering and analysis tasks",
"examples": ["market_research", "competitive_analysis", "data_collection"],
"required_agents": ["researcher", "analyst"]
},
"analysis": {
"description": "Data processing and insights generation",
"examples": ["data_analysis", "trend_identification", "performance_review"],
"required_agents": ["analyst", "specialist"]
},
"creation": {
"description": "Content and asset creation tasks",
"examples": ["content_writing", "design_creation", "code_development"],
"required_agents": ["creator", "specialist"]
},
"coordination": {
"description": "Planning and management tasks",
"examples": ["project_planning", "resource_allocation", "team_coordination"],
"required_agents": ["manager", "coordinator"]
}
}
Task Decomposition Strategies
Hierarchical Decomposition
class TaskDecomposer:
def __init__(self):
self.decomposition_rules = {
"research": self._decompose_research_task,
"analysis": self._decompose_analysis_task,
"creation": self._decompose_creation_task,
"coordination": self._decompose_coordination_task
}
def decompose_task(self, task: CrewTask) -> List[CrewTask]:
"""Decompose a complex task into manageable subtasks"""
task_type = self._classify_task(task)
if task_type in self.decomposition_rules:
return self.decomposition_rules[task_type](task)
else:
return [task] # Task is already simple enough
def _decompose_research_task(self, task: CrewTask) -> List[CrewTask]:
"""Decompose research tasks"""
return [
CrewTask(
description=f"Define research scope and objectives for: {task.description[:50]}...",
agent=self._find_agent_by_role("researcher"),
expected_output="Research plan with objectives and methodology",
priority=task.priority,
complexity="low"
),
CrewTask(
description=f"Gather information from multiple sources for: {task.description[:50]}...",
agent=self._find_agent_by_role("researcher"),
expected_output="Comprehensive data collection from relevant sources",
priority=task.priority,
complexity="medium"
),
CrewTask(
description=f"Analyze collected data and extract insights for: {task.description[:50]}...",
agent=self._find_agent_by_role("analyst"),
expected_output="Analysis report with key findings and insights",
priority=task.priority,
complexity="medium"
)
]
def _classify_task(self, task: CrewTask) -> str:
"""Classify task type based on content analysis"""
description = task.description.lower()
if any(word in description for word in ["research", "investigate", "study", "analyze data"]):
return "research"
elif any(word in description for word in ["write", "create", "design", "develop"]):
return "creation"
elif any(word in description for word in ["plan", "coordinate", "manage", "organize"]):
return "coordination"
else:
return "analysis"
Dependency Management
class TaskDependencyManager:
def __init__(self):
self.task_graph = {}
self.completed_tasks = set()
def add_task(self, task: CrewTask):
"""Add task to dependency graph"""
self.task_graph[task.id] = {
"task": task,
"dependencies": task.dependencies,
"dependents": []
}
# Update dependents
for dep_id in task.dependencies:
if dep_id in self.task_graph:
self.task_graph[dep_id]["dependents"].append(task.id)
def get_executable_tasks(self) -> List[CrewTask]:
"""Get tasks that can be executed (all dependencies met)"""
executable = []
for task_id, task_info in self.task_graph.items():
if task_id not in self.completed_tasks:
deps_met = all(dep in self.completed_tasks for dep in task_info["dependencies"])
if deps_met:
executable.append(task_info["task"])
return executable
def mark_completed(self, task_id: str):
"""Mark task as completed"""
self.completed_tasks.add(task_id)
def get_critical_path(self) -> List[str]:
"""Calculate critical path through task dependencies"""
# Implementation of critical path algorithm
return self._calculate_critical_path()
Advanced Planning Techniques
Task Prioritization
class TaskPrioritizer:
def __init__(self):
self.priority_weights = {
"urgent": 1.0,
"high": 0.8,
"medium": 0.6,
"low": 0.4
}
def prioritize_tasks(self, tasks: List[CrewTask]) -> List[CrewTask]:
"""Prioritize tasks based on multiple factors"""
scored_tasks = []
for task in tasks:
score = self._calculate_priority_score(task)
scored_tasks.append((task, score))
# Sort by score (highest first)
scored_tasks.sort(key=lambda x: x[1], reverse=True)
return [task for task, score in scored_tasks]
def _calculate_priority_score(self, task: CrewTask) -> float:
"""Calculate priority score based on multiple factors"""
base_score = self.priority_weights.get(task.priority, 0.5)
# Factor in urgency (deadline proximity)
urgency_multiplier = self._calculate_urgency_multiplier(task)
# Factor in dependencies (tasks with more dependents are more important)
dependency_multiplier = 1 + (len(self._get_dependents(task)) * 0.1)
# Factor in complexity (simpler tasks might be prioritized for quick wins)
complexity_multiplier = 1 - (self._get_complexity_weight(task) * 0.2)
return base_score * urgency_multiplier * dependency_multiplier * complexity_multiplier
Resource Allocation Planning
class ResourcePlanner:
def __init__(self, agents: List[Agent]):
self.agents = agents
self.agent_workload = {agent.id: 0 for agent in agents}
def allocate_resources(self, tasks: List[CrewTask]) -> Dict[str, List[CrewTask]]:
"""Allocate tasks to agents based on capacity and expertise"""
allocation = {agent.id: [] for agent in self.agents}
# Sort tasks by priority
prioritized_tasks = sorted(tasks, key=lambda t: self._get_task_priority(t), reverse=True)
for task in prioritized_tasks:
best_agent = self._find_best_agent_for_task(task)
if best_agent:
allocation[best_agent.id].append(task)
self.agent_workload[best_agent.id] += task.estimated_duration
return allocation
def _find_best_agent_for_task(self, task: CrewTask) -> Optional[Agent]:
"""Find best agent for task based on expertise and workload"""
best_agent = None
best_score = 0
for agent in self.agents:
expertise_score = self._calculate_expertise_match(agent, task)
workload_score = self._calculate_workload_score(agent)
availability_score = self._check_agent_availability(agent, task)
total_score = (expertise_score * 0.5) + (workload_score * 0.3) + (availability_score * 0.2)
if total_score > best_score:
best_score = total_score
best_agent = agent
return best_agent
def _calculate_expertise_match(self, agent: Agent, task: CrewTask) -> float:
"""Calculate how well agent's expertise matches task requirements"""
# Implementation would analyze agent's skills vs task requirements
return 0.8 # Placeholder
Task Execution Monitoring
Progress Tracking
class TaskMonitor:
def __init__(self):
self.task_status = {}
self.execution_logs = []
def track_task_execution(self, task: CrewTask, agent: Agent):
"""Monitor task execution progress"""
self.task_status[task.id] = {
"status": "in_progress",
"agent": agent.id,
"start_time": self._get_current_time(),
"progress": 0,
"milestones": self._define_milestones(task)
}
def update_progress(self, task_id: str, progress: float, notes: str = ""):
"""Update task progress"""
if task_id in self.task_status:
self.task_status[task_id]["progress"] = progress
self.task_status[task_id]["last_update"] = self._get_current_time()
self.execution_logs.append({
"task_id": task_id,
"timestamp": self._get_current_time(),
"progress": progress,
"notes": notes
})
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""Get current task status"""
return self.task_status.get(task_id, {})
def get_overall_progress(self, tasks: List[CrewTask]) -> Dict[str, Any]:
"""Get overall crew progress"""
total_tasks = len(tasks)
completed_tasks = len([t for t in tasks if t.status == "completed"])
in_progress_tasks = len([t for t in tasks if t.status == "in_progress"])
return {
"total_tasks": total_tasks,
"completed": completed_tasks,
"in_progress": in_progress_tasks,
"completion_percentage": (completed_tasks / total_tasks) * 100 if total_tasks > 0 else 0
}
Adaptive Planning
Dynamic Replanning
class AdaptivePlanner:
def __init__(self):
self.original_plan = None
self.execution_history = []
self.performance_metrics = {}
def create_adaptive_plan(self, tasks: List[CrewTask], agents: List[Agent]):
"""Create initial plan with adaptation capabilities"""
self.original_plan = self._create_initial_plan(tasks, agents)
return self.original_plan
def adapt_plan(self, execution_feedback: Dict[str, Any]):
"""Adapt plan based on execution feedback"""
self.execution_history.append(execution_feedback)
# Analyze performance
issues = self._identify_performance_issues()
if issues:
# Generate adaptation strategies
adaptations = self._generate_adaptations(issues)
# Apply adaptations
updated_plan = self._apply_adaptations(self.original_plan, adaptations)
self.original_plan = updated_plan
return updated_plan
return self.original_plan
def _identify_performance_issues(self) -> List[str]:
"""Identify performance issues from execution history"""
issues = []
recent_executions = self.execution_history[-10:] # Last 10 executions
# Check for delays
avg_duration = sum(e.get("duration", 0) for e in recent_executions) / len(recent_executions)
if avg_duration > 1.5 * self._get_expected_duration():
issues.append("performance_degradation")
# Check for failures
failure_rate = len([e for e in recent_executions if e.get("success") == False]) / len(recent_executions)
if failure_rate > 0.2:
issues.append("high_failure_rate")
return issues
def _generate_adaptations(self, issues: List[str]) -> List[Dict[str, Any]]:
"""Generate adaptation strategies"""
adaptations = []
for issue in issues:
if issue == "performance_degradation":
adaptations.append({
"type": "resource_reallocation",
"action": "redistribute_tasks_to_underutilized_agents"
})
elif issue == "high_failure_rate":
adaptations.append({
"type": "quality_improvement",
"action": "add_additional_review_steps"
})
return adaptations
Best Practices for Task Planning
Planning Principles
- SMART Tasks: Specific, Measurable, Achievable, Relevant, Time-bound
- Dependency Awareness: Understand and manage task relationships
- Resource Optimization: Match tasks to agent capabilities and availability
- Progress Monitoring: Track execution and adapt as needed
- Quality Assurance: Include validation and review steps
Common Planning Patterns
# Research-Development Pattern
research_dev_pattern = {
"phases": ["research", "analysis", "development", "testing", "deployment"],
"dependencies": {
"analysis": ["research"],
"development": ["analysis"],
"testing": ["development"],
"deployment": ["testing"]
}
}
# Agile Sprint Pattern
agile_pattern = {
"iterations": ["planning", "development", "review", "retrospective"],
"timeboxed": True,
"feedback_loops": ["daily_standup", "sprint_review", "retrospective"]
}
# Critical Path Pattern
critical_path_pattern = {
"focus": "identify and prioritize critical path tasks",
"optimization": "minimize critical path duration",
"monitoring": "track critical path progress closely"
}
Task Planning Architecture
flowchart TD
A[Complex objective received]
B[Break into sequential tasks]
C[Each Task gets description and expected_output]
D[Assign task to specialized agent]
E[Define task dependencies via context]
F[Crew executes tasks in order]
G[Output of each task feeds next task]
H[Final output synthesized]
A --> B
B --> C
C --> D
D --> E
E --> F
F --> G
G --> H
What We've Accomplished
✅ Mastered task definition with structured frameworks ✅ Implemented task decomposition strategies for complex objectives ✅ Built dependency management systems ✅ Created intelligent task prioritization algorithms ✅ Developed resource allocation planning ✅ Established progress monitoring and tracking ✅ Implemented adaptive planning for changing conditions
Next Steps
Ready to equip your agents with tools? In Chapter 4: Tool Integration, we'll explore how to integrate external tools and APIs to extend agent capabilities.
Key Takeaway: Effective task planning is the foundation of successful AI crew execution. Well-planned tasks with clear dependencies, proper resource allocation, and continuous monitoring enable agents to work efficiently toward complex objectives.
What Problem Does This Solve?
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, task, agent so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 3: Task Definition & Planning as an operating subsystem inside CrewAI Tutorial: Building Collaborative AI Agent Teams, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around CrewTask, tasks, List as your checklist when adapting these patterns to your own repository.
How it Works Under the Hood
Under the hood, Chapter 3: Task Definition & Planning usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
self. - Input normalization: shape incoming data so
taskreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
agent. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Source Walkthrough
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
selfandtaskto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production