跳转至

管道

什么是流水线?

在 crewAI 中,流水线代表一种结构化的工作流,允许顺序或并行执行多个 crew。它提供了一种组织涉及多个阶段的复杂过程的方法,其中一个阶段的输出可以作为后续阶段的输入。

关键术语

理解以下术语对于有效使用流水线至关重要:

  • 阶段:流水线的一个独立部分,可以是顺序的(单个 crew)或并行的(多个 crew 同时执行)。
  • 启动:针对给定输入集的特定流水线执行,表示通过流水线的单个处理实例。
  • 分支:阶段内的并行执行(例如,并发 crew 操作)。
  • 跟踪:单个输入通过整个流水线的旅程,捕获其路径和经历的转换。

示例流水线结构:

crew1 >> [crew2, crew3] >> crew4

这表示一个包含三个阶段的流水线:

  1. 一个顺序阶段(crew1)
  2. 一个包含两个分支的并行阶段(crew2 和 crew3 同时执行)
  3. 另一个顺序阶段(crew4)

每个输入创建其自己的启动,流经流水线的所有阶段。可以并发处理多个启动,每个启动都遵循定义的流水线结构。

流水线属性

属性 参数 描述
阶段 stages 一个 PipelineStage 列表(crew、crew 列表或路由器),表示要按顺序执行的阶段。

创建流水线

当创建流水线时,你定义一系列阶段,每个阶段由单个 crew 或用于并行执行的 crew 列表组成。流水线确保每个阶段按顺序执行,一个阶段的输出作为下一个阶段的输入。

示例:组装流水线

from crewai import Crew, Process, Pipeline

# 定义你的 crew
research_crew = Crew(
    agents=[researcher],
    tasks=[research_task],
    process=Process.sequential
)

analysis_crew = Crew(
    agents=[analyst],
    tasks=[analysis_task],
    process=Process.sequential
)

writing_crew = Crew(
    agents=[writer],
    tasks=[writing_task],
    process=Process.sequential
)

# 组装流水线
my_pipeline = Pipeline(
    stages=[research_crew, analysis_crew, writing_crew]
)

流水线方法

方法 描述
kickoff 执行流水线,处理所有阶段并返回结果。此方法通过流水线启动一个或多个启动,处理阶段之间的数据流。
process_runs 为每个提供的输入运行流水线,处理阶段之间的数据流和转换。

流水线输出

理解流水线输出

crewAI 框架中流水线的输出封装在 PipelineKickoffResult 类中。这个类提供了一种结构化的方式来访问流水线执行的结果,包括原始字符串、JSON 和 Pydantic 模型等格式。

流水线输出属性

属性 参数 类型 描述
ID id UUID4 流水线输出的唯一标识符。
运行结果 run_results List[PipelineRunResult] 一个 PipelineRunResult 对象列表,每个对象代表通过流水线的一次运行的输出。

流水线输出方法

方法/属性 描述
add_run_result 向运行结果列表中添加一个 PipelineRunResult

流水线运行结果属性

属性 参数 类型 描述
ID id UUID4 运行结果的唯一标识符。
原始 raw str 流水线启动中最终阶段的原始输出。
Pydantic pydantic Any 如果适用,表示最终阶段结构化输出的 Pydantic 模型对象。
JSON 字典 json_dict Union[Dict[str, Any], None] 如果适用,表示最终阶段 JSON 输出的字典。
令牌使用 token_usage Dict[str, UsageMetrics] 流水线启动中所有阶段令牌使用的摘要。
跟踪 trace List[Any] 输入通过流水线启动的旅程跟踪。
Crew 输出 crews_outputs List[CrewOutput] 表示流水线启动中每个 crew 输出的 CrewOutput 对象列表。

流水线运行结果方法和属性

方法/属性 描述
json 如果最终任务的输出格式为 JSON,则返回运行结果的 JSON 字符串表示。
to_dict 将 JSON 和 Pydantic 输出转换为字典。
str 返回运行结果的字符串表示,优先考虑 Pydantic,然后是 JSON,然后是原始。

访问流水线输出

一旦执行了流水线,其输出可以通过 process_runs 方法返回的 PipelineOutput 对象访问。PipelineOutput 类提供了对单个 PipelineRunResult 对象的访问,每个对象代表通过流水线的一次运行。


一旦流水线被执行,其输出可以通过 process_runs 方法返回的 PipelineOutput 对象访问。PipelineOutput 类提供了对单个 PipelineRunResult 对象的访问,每个对象代表通过流水线的一次运行。

# 定义输入数据
input_data = [{"initial_query": "AI的最新进展"}, {"initial_query": "机器人的未来"}]

# 执行流水线
pipeline_output = await my_pipeline.process_runs(input_data)

# 访问结果
for run_result in pipeline_output.run_results:
    print(f"运行ID: {run_result.id}")
    print(f"最终原始输出: {run_result.raw}")
    if run_result.json_dict:
        print(f"JSON输出: {json.dumps(run_result.json_dict, indent=2)}")
    if run_result.pydantic:
        print(f"Pydantic输出: {run_result.pydantic}")
    print(f"令牌使用: {run_result.token_usage}")
    print(f"跟踪: {run_result.trace}")
    print("Crew输出:")
    for crew_output in run_result.crews_outputs:
        print(f"  Crew: {crew_output.raw}")
    print("\n")

这个例子演示了如何访问和处理流水线输出,包括单个运行结果及其相关数据。

使用流水线

流水线对于涉及多个处理、分析或内容生成阶段的复杂工作流特别有用。它们允许你:

  1. 顺序操作:按特定顺序执行 crew,确保一个 crew 的输出作为下一个 crew 的输入。
  2. 并行处理:在阶段内并行运行多个 crew,以提高效率。
  3. 管理复杂工作流:将大型任务分解为更小、更易于管理的步骤,由专门的 crew 执行。

示例:运行流水线

# 定义输入数据
input_data = [{"initial_query": "AI的最新进展"}]

# 执行流水线,为每个输入启动运行
results = await my_pipeline.process_runs(input_data)

# 访问结果
for result in results:
    print(f"最终输出: {result.raw}")
    print(f"令牌使用: {result.token_usage}")
    print(f"跟踪: {result.trace}")  # 显示输入通过所有阶段的路径

高级功能

阶段内的并行执行

你可以在阶段内定义并行执行,通过提供 crew 列表来创建多个分支:

parallel_analysis_crew = Crew(agents=[financial_analyst], tasks=[financial_analysis_task])
market_analysis_crew = Crew(agents=[market_analyst], tasks=[market_analysis_task])

my_pipeline = Pipeline(
    stages=[
        research_crew,
        [parallel_analysis_crew, market_analysis_crew],  # 并行执行(分支)
        writing_crew
    ]
)

流水线中的路由器

路由器是 crewAI 流水线中的一个强大功能,它允许动态决策和工作流中的分支。它们能够根据特定条件或标准来指导执行流程,使你的流水线更加灵活和适应性强。

什么是路由器?

在 crewAI 中,路由器是一个特殊的组件,可以作为流水线中的一个阶段。它评估输入数据并确定执行应该采取的下一个路径。这允许在流水线中进行条件分支,根据路由器的决策执行不同的 crew 或子流水线。

路由器的主要组件


1. 路由:一个命名路由的字典,每个路由都与一个条件和满足该条件时要执行的流水线相关联。 2. 默认路由:如果未满足定义的路由条件,则执行的后备流水线。

创建路由器

以下是如何创建路由器的示例:

from crewai import Router, Route, Pipeline, Crew, Agent, Task

# 定义你的代理
classifier = Agent(name="Classifier", role="Email Classifier")
urgent_handler = Agent(name="Urgent Handler", role="Urgent Email Processor")
normal_handler = Agent(name="Normal Handler", role="Normal Email Processor")

# 定义你的任务
classify_task = Task(description="根据邮件的内容和元数据对邮件进行分类。")
urgent_task = Task(description="快速处理和回复紧急邮件。")
normal_task = Task(description="彻底处理和回复普通邮件。")

# 定义你的 crew
classification_crew = Crew(agents=[classifier], tasks=[classify_task]) # 根据1-10的紧急程度对邮件进行分类
urgent_crew = Crew(agents=[urgent_handler], tasks=[urgent_task])
normal_crew = Crew(agents=[normal_handler], tasks=[normal_task])

# 为不同的紧急程度创建流水线
urgent_pipeline = Pipeline(stages=[urgent_crew])
normal_pipeline = Pipeline(stages=[normal_crew])

# 创建一个路由器
email_router = Router(
    routes={
        "high_urgency": Route(
            condition=lambda x: x.get("urgency_score", 0) > 7,
            pipeline=urgent_pipeline
        ),
        "low_urgency": Route(
            condition=lambda x: x.get("urgency_score", 0) <= 7,
            pipeline=normal_pipeline
        )
    },
    default=Pipeline(stages=[normal_pipeline])  # 如果没有紧急程度分数,则默认为普通
)

# 在主流水线中使用路由器
main_pipeline = Pipeline(stages=[classification_crew, email_router])

inputs = [{"email": "..."}, {"email": "..."}]  # 邮件数据列表

main_pipeline.kickoff(inputs=inputs)

在这个例子中,路由器根据邮件的紧急程度分数决定是使用紧急流水线还是普通流水线。如果紧急程度分数大于7,则路由到紧急流水线;否则,使用普通流水线。如果输入不包含紧急程度分数,则默认仅为分类 crew。

使用路由器的优势

  1. 动态工作流:根据输入特征或中间结果调整流水线的 behavior。
  2. 效率:将紧急任务路由到更快的流程,为不太紧急的输入保留更彻底的流水线。
  3. 灵活性:轻松修改或扩展流水线的逻辑,而不改变核心结构。
  4. 可扩展性:使用单个流水线结构处理各种类型的邮件和紧急程度。

错误处理和验证

Pipeline 类包括验证机制,以确保流水线结构的健壮性:

  • 验证阶段是否只包含 Crew 实例或 Crew 实例列表。
  • 防止阶段的嵌套,以保持清晰的结构。