Using CrewAI Parallelism
Introduction
CrewAI is a powerful AI framework that allows for efficient parallel processing of tasks. This tutorial will guide you through the steps to leverage CrewAI's parallelism capabilities, helping you to enhance performance and optimize resource utilization in your projects.
Prerequisites
Before you begin, ensure you have the following:
- Basic understanding of Python programming
- Installed CrewAI library (version 2.0 or later)
- Familiarity with task-based parallelism concepts
Installing CrewAI
To install CrewAI, use the following command:
This will download and install the latest version of CrewAI from the Python Package Index (PyPI).
Basic Usage
CrewAI allows you to define tasks and execute them in parallel. Here is a simple example to demonstrate this:
import crewai
def task_function(task_id):
print(f"Processing task {task_id}")
if __name__ == "__main__":
tasks = [crewai.Task(task_function, args=(i,)) for i in range(10)]
crewai.run(tasks)
In this example, we define a function task_function
that prints the task ID. We then create a list of tasks and execute them in parallel using crewai.run
.
Advanced Usage
CrewAI provides more advanced features for parallelism, including setting up task dependencies and managing resources. Here's an example:
import crewai
def task_a():
print("Task A complete")
def task_b():
print("Task B complete")
def task_c():
print("Task C complete")
if __name__ == "__main__":
task1 = crewai.Task(task_a)
task2 = crewai.Task(task_b, depends_on=[task1])
task3 = crewai.Task(task_c, depends_on=[task2])
crewai.run([task1, task2, task3])
In this example, task_b
depends on the completion of task_a
, and task_c
depends on task_b
. CrewAI ensures that tasks are executed in the correct order.
Performance Tuning
To maximize performance, CrewAI allows you to customize the number of worker threads and other parameters. Here's how to do it:
import crewai
def my_task(task_id):
print(f"Executing task {task_id}")
if __name__ == "__main__":
tasks = [crewai.Task(my_task, args=(i,)) for i in range(100)]
crewai.run(tasks, num_workers=10, timeout=30)
In this example, we specify that CrewAI should use 10 worker threads and set a timeout of 30 seconds for task execution.
Handling Errors
CrewAI provides mechanisms for error handling and retry logic. Here's an example:
import crewai
def error_prone_task(task_id):
if task_id % 2 == 0:
raise ValueError(f"Error in task {task_id}")
print(f"Task {task_id} completed successfully")
if __name__ == "__main__":
tasks = [crewai.Task(error_prone_task, args=(i,), retry=3) for i in range(10)]
crewai.run(tasks)
In this example, tasks with even IDs raise an error, and CrewAI retries these tasks up to 3 times before giving up.
Conclusion
By using CrewAI's parallelism features, you can significantly improve the performance of your applications. This tutorial covered the basics of setting up and running tasks in parallel, as well as advanced features like task dependencies, performance tuning, and error handling. Experiment with these features to find the best configuration for your specific needs.