条件任务
简介¶
crewAI中的条件任务允许根据先前任务的结果进行动态工作流适应。这一强大功能使团队能够做出决策并选择性地执行任务,从而提高AI驱动流程的灵活性和效率。
示例用法¶
from typing import List
from pydantic import BaseModel
from crewai import Agent, Crew
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.task import Task
from crewai_tools import SerperDevTool
# 定义条件函数用于条件任务
# 如果为假,则跳过任务;如果为真,则执行任务。
def is_data_missing(output: TaskOutput) -> bool:
return len(output.pydantic.events) < 10 # 这将跳过此任务
# 定义代理
data_fetcher_agent = Agent(
role="数据获取者",
goal="使用Serper工具在线获取数据",
backstory="背景故事1",
verbose=True,
tools=[SerperDevTool()]
)
data_processor_agent = Agent(
role="数据处理者",
goal="处理获取的数据",
backstory="背景故事2",
verbose=True
)
summary_generator_agent = Agent(
role="摘要生成器",
goal="从获取的数据中生成摘要",
backstory="背景故事3",
verbose=True
)
class EventOutput(BaseModel):
events: List[str]
task1 = Task(
description="使用Serper工具获取旧金山活动数据",
expected_output="本周旧金山要做的10件事列表",
agent=data_fetcher_agent,
output_pydantic=EventOutput,
)
conditional_task = ConditionalTask(
description="""
检查数据是否缺失。如果我们的事件少于10个,
使用Serper工具获取更多事件,以便
我们本周在旧金山有总共10个事件。
""",
expected_output="本周旧金山要做的10件事列表",
condition=is_data_missing,
agent=data_processor_agent,
)
task3 = Task(
description="从获取的数据中生成旧金山活动的摘要",
expected_output="关于客户及其客户和竞争对手的完整报告,包括他们的人口统计信息、偏好、市场定位和受众参与度。",
agent=summary_generator_agent,
)
# 创建一个包含任务的团队
crew = Crew(
agents=[data_fetcher_agent, data_processor_agent, summary_generator_agent],
tasks=[task1, conditional_task, task3],
verbose=True,
planning=True
)
# 运行团队
result = crew.kickoff()
print("结果", result)