第3章:并行化

智能体设计模式:构建智能系统的实战指南 阅读 54 次

第3章:并行化

并行化模式概述

在前几章中,我们探讨了用于顺序工作流的提示链和用于动态决策以及在不同路径之间转换的路由。虽然这些模式至关重要,但许多复杂的智能体任务涉及多个子任务,这些子任务可以同时执行,而不是依次进行。这就是并行化模式变得至关重要的地方。

并行化涉及同时执行多个组件,例如LLM调用、工具使用,甚至整个子智能体(见图1)。与等待一个步骤完成后再开始下一个步骤不同,并行执行允许独立任务同时运行,显著缩短了可以分解为独立部分的任务的总体执行时间。

考虑一个旨在研究某个主题并总结其发现结果的智能体。一种顺序化的方法可能:

  1. 搜索源A。
  2. 概述源A。
  3. 搜索源B。
  4. 概括来源B。
  5. 从摘要A和B中综合得出最终答案。

可以采用并行方法来:

  1. 同时搜索源A和源B。
  2. 两个搜索完成后,同时总结源A和源B。
  3. 从摘要A和B中综合得出最终答案(此步骤通常是顺序进行的,等待并行步骤完成)。

核心思想是识别出工作流程中不依赖于其他部分输出的部分,并将它们并行执行。当处理具有延迟的外部服务(如API或数据库)时,这种方法尤其有效,因为您可以同时发出多个请求。

实现并行化通常需要支持异步执行或多线程/多进程的框架。现代智能体框架在设计时考虑了异步操作,允许您轻松定义可以并行运行的步骤。

image1

图1. 子智能体并行化示例

像LangChain、LangGraph和Google ADK这样的框架提供了并行执行机制。在LangChain表达式语言(LCEL)中,您可以通过使用|(表示顺序)等运算符组合可运行对象来实现并行执行,并通过构建具有并发执行分支的链或图来结构化您的链或图。LangGraph凭借其图结构,允许您定义多个节点,这些节点可以从单个状态转换中执行,从而在工作流程中有效地实现并行分支。Google ADK提供了强大且原生的机制,以促进和管理智能体的并行执行,显著提高了复杂多智能体系统的效率和可扩展性。ADK框架内的这种固有能力允许开发者设计和实现多智能体可以并发操作而不是顺序操作的解决方案。

并行化模式对于提高智能体系统的效率和响应性至关重要,尤其是在处理涉及多个独立查找、计算或与外部服务交互的任务时。它是优化复杂智能体工作流程性能的关键技术。

实际应用与用例

并行化是一种强大的模式,可以优化智能体在各种应用中的性能:

  1. 信息收集与研究: 同时从多个来源收集信息是一个经典用例。

用例: 智能体研究一家公司。 * 并行任务: 同时搜索新闻文章、获取股票数据、检查社交媒体提及情况以及查询公司数据库。 * 优势: 比顺序查找更快地收集全面视图。

  1. 数据处理与分析: 同时应用不同的分析技术或处理不同的数据段。

用例: 一个分析客户反馈的智能体。 * 并行任务: 在一批反馈条目中同时运行情感分析、提取关键词、分类反馈和识别紧急问题。 * 优势: 快速提供多角度分析。

  1. 多API或工具交互: 调用多个独立的API或工具,以收集不同类型的信息或执行不同的操作。

用例: 旅行规划智能体。 * 并行任务: 同时检查航班价格、搜索酒店可用性、查找当地活动以及获取餐厅推荐。 * 好处: 更快地呈现完整的旅行计划。

  1. 基于多个组件的内容生成: 并行生成复杂内容的不同部分。

用例: 智能体创建营销邮件。 * 并行任务: 同时生成主题行、草拟邮件正文、寻找相关图片以及创建行动号召按钮文本。 * 优势: 更高效地组装最终电子邮件。

  1. 验证与确认: 同时执行多个独立的检查或验证。

用例: 智能体验证用户输入。 * 并行任务: 同时检查电子邮件格式、验证电话号码、与数据库核对地址,以及检查是否存在粗俗语言。 * 优点: 提供对输入有效性的更快反馈。

  1. 多模态处理: 同时处理相同输入的不同模态(文本、图像、音频)。

用例: 一个智能体分析包含文本和图像的社会媒体帖子。 * 并行任务: 同时分析文本的情感和关键词,并分析图像中的物体和场景描述。 * 优势: 更快地整合来自不同模态的见解。

  1. A/B 测试或多种选项生成: 并行生成多个响应或输出的变体,以选择最佳的一个。

用例: 一个生成不同创意文本选项的智能体。 * 并行任务: 使用略有不同的提示或模型,同时为文章生成三个不同的标题。 * 优点: 允许快速比较和选择最佳选项。

并行化是智能体设计中的一种基本优化技术,允许开发者通过利用独立任务的并发执行来构建性能更优、响应更快的应用程序。

动手代码示例(LangChain)

LangChain框架内的并行执行通过LangChain表达式语言(LCEL)得以实现。主要方法是在字典或列表结构中组织多个可运行组件。当将此集合作为输入传递给链中的后续组件时,LCEL运行时将包含的可运行组件并发执行。

在LangGraph的语境下,这一原则应用于图的拓扑结构。通过设计图,使得多个节点(缺乏直接的顺序依赖关系)可以从一个共同的节点启动,从而定义了并行工作流。这些并行路径独立执行,直到它们的结果可以在图中的后续汇聚点进行聚合。

以下实现展示了使用LangChain框架构建的并行处理工作流程。该工作流程旨在针对单个用户查询同时执行两个独立的操作。这些并行过程被实例化为不同的链或函数,其各自的输出随后被汇总为统一的结果。

本实施的先决条件包括安装必要的Python包,例如langchain、langchain-community以及类似langchain-openai的模型提供者库。此外,还需要在本地环境中配置所选语言模型的合法API密钥以进行身份验证。

import os
import asyncio
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough

# --- Configuration ---
# Ensure your API key environment variable is set (e.g., OPENAI_API_KEY)
try:
    llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
except Exception as e:
    print(f"Error initializing language model: {e}")
    llm = None

# --- Define Independent Chains ---
# These three chains represent distinct tasks that can be executed in parallel.
summarize_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Summarize the following topic concisely:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

questions_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Generate three interesting questions about the following topic:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

terms_chain: Runnable = (
    ChatPromptTemplate.from_messages([
        ("system", "Identify 5-10 key terms from the following topic, separated by commas:"),
        ("user", "{topic}")
    ])
    | llm
    | StrOutputParser()
)

# --- Build the Parallel + Synthesis Chain ---
# 1. Define the block of tasks to run in parallel. The results of these,
#    along with the original topic, will be fed into the next step.
map_chain = RunnableParallel(
    {
        "summary": summarize_chain,
        "questions": questions_chain,
        "key_terms": terms_chain,
        "topic": RunnablePassthrough(),  # Pass the original topic through
    }
)

# 2. Define the final synthesis prompt which will combine the parallel results.
synthesis_prompt = ChatPromptTemplate.from_messages([
    ("system", """Based on the following information:
     Summary: {summary}
     Related Questions: {questions}
     Key Terms: {key_terms}
     Synthesize a comprehensive answer."""),
    ("user", "Original topic: {topic}")
])

# 3. Construct the full chain by piping the parallel results directly
#    into the synthesis prompt, followed by the LLM and output parser.
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()

# --- Run the Chain ---
async def run_parallel_example(topic: str) -> None:
    """Asynchronously invokes the parallel processing chain with a specific topic
    and prints the synthesized result.

    Args:
        topic: The input topic to be processed by the LangChain chains.
    """
    if not llm:
        print("LLM not initialized. Cannot run example.")
        return
    print(f"\n--- Running Parallel LangChain Example for Topic: '{topic}' ---")
    try:
        # The input to `ainvoke` is the single 'topic' string,
        # then passed to each runnable in the `map_chain`.
        response = await full_parallel_chain.ainvoke(topic)
        print("\n--- Final Response ---")
        print(response)
    except Exception as e:
        print(f"\nAn error occurred during chain execution: {e}")

if __name__ == "__main__":
    test_topic = "The history of space exploration"
    # In Python 3.7+, asyncio.run is the standard way to run an async function.
    asyncio.run(run_parallel_example(test_topic))

提供的Python代码实现了一个LangChain应用程序,该应用程序通过利用并行执行来高效地处理给定主题。请注意,asyncio提供的是并发,而不是并行。它通过使用事件循环在单个线程中实现这一点,该循环在任务空闲时(例如,等待网络请求)智能地在任务之间切换。这创造出多个任务同时进展的效果,但代码本身仍然只由一个线程执行,受Python的全局解释器锁(GIL)所限制。

代码首先从langchain_openai和langchain_core中导入必要的模块,包括语言模型、提示、输出解析和可执行结构组件。代码尝试初始化一个ChatOpenAI实例,具体使用“gpt-4o-mini”模型,并指定温度以控制创造力。在语言模型初始化过程中使用了try-except块以提高鲁棒性。随后定义了三个独立的LangChain "链",每个链都设计用于对输入主题执行不同的任务。第一个链用于简洁地总结主题,使用系统消息和包含主题占位符的用户消息。第二个链配置为生成与主题相关的三个有趣问题。第三个链设置为从输入主题中识别5到10个关键术语,要求它们以逗号分隔。这些独立的链每个都包含一个针对其特定任务的ChatPromptTemplate,随后是初始化的语言模型和一个StrOutputParser以将输出格式化为字符串。

随后构建了一个RunnableParallel块来捆绑这三个链,使它们能够同时执行。这个并行可运行块还包括一个RunnablePassthrough,以确保原始输入主题对后续步骤可用。为最终的合成步骤定义了一个单独的ChatPromptTemplate,它将摘要、问题、关键词和原始主题作为输入,以生成一个全面的答案。通过将map_chain(并行块)按顺序排列到合成提示中,然后是语言模型和输出解析器,创建了名为full_parallel_chain的完整端到端处理链。提供了一个异步函数run_parallel_example来演示如何调用这个完整_parallel_chain。此函数接受主题作为输入,并使用invoke运行异步链。最后,标准的Python if __name__ \== "__main__": 块展示了如何使用asyncio.run管理异步执行来执行run_parallel_example,示例主题为“太空探索的历史”。

本质上,这段代码设置了一个工作流程,其中针对特定主题,同时进行多个LLM调用(用于摘要、提问和术语),然后通过最后的LLM调用将它们的结果进行合并。这展示了LangChain在智能体工作流程中实现并行化的核心思想。

动手代码示例(Google ADK)

好的,现在让我们将注意力转向一个具体示例,该示例展示了这些概念在Google ADK框架中的应用。我们将探讨ADK原语,如ParallelAgent和SequentialAgent,如何应用于构建一个利用并发执行以提高效率的智能体流程。

from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent
from google.adk.tools import google_search

GEMINI_MODEL = "gemini-2.0-flash"

# --- 1. Define Researcher Sub-Agents (to run in parallel) ---
# Researcher 1: Renewable Energy
researcher_agent_1 = LlmAgent(
    name="RenewableEnergyResearcher",
    model=GEMINI_MODEL,
    instruction="""You are an AI Research Assistant specializing in energy. Research the latest advancements in 'renewable energy sources'. Use the Google Search tool provided. Summarize your key findings concisely (1-2 sentences). Output *only* the summary.""",
    description="Researches renewable energy sources.",
    tools=[google_search],
    output_key="renewable_energy_result"
)

# Researcher 2: Electric Vehicles
researcher_agent_2 = LlmAgent(
    name="EVResearcher",
    model=GEMINI_MODEL,
    instruction="""You are an AI Research Assistant specializing in transportation. Research the latest developments in 'electric vehicle technology'. Use the Google Search tool provided. Summarize your key findings concisely (1-2 sentences). Output *only* the summary.""",
    description="Researches electric vehicle technology.",
    tools=[google_search],
    output_key="ev_technology_result"
)

# Researcher 3: Carbon Capture
researcher_agent_3 = LlmAgent(
    name="CarbonCaptureResearcher",
    model=GEMINI_MODEL,
    instruction="""You are an AI Research Assistant specializing in climate solutions. Research the current state of 'carbon capture methods'. Use the Google Search tool provided. Summarize your key findings concisely (1-2 sentences). Output *only* the summary.""",
    description="Researches carbon capture methods.",
    tools=[google_search],
    output_key="carbon_capture_result"
)

# --- 2. Create the ParallelAgent (Runs researchers concurrently) ---
# This agent orchestrates the concurrent execution of the researchers.
# It finishes once all researchers have completed and stored their results in state.
parallel_research_agent = ParallelAgent(
    name="ParallelWebResearchAgent",
    sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3],
    description="Runs multiple research agents in parallel to gather information."
)

# --- 3. Define the Merger Agent (Runs *after* the parallel agents) ---
# This agent takes the results stored in the session state by the parallel agents
# and synthesizes them into a single, structured response with attributions.
merger_agent = LlmAgent(
    name="SynthesisAgent",
    model=GEMINI_MODEL,  # Or potentially a more powerful model if needed for synthesis
    instruction="""You are an AI Assistant responsible for combining research findings into a structured report. Your primary task is to synthesize the following research summaries, clearly attributing findings to their source areas. Structure your response using headings for each topic. Ensure the report is coherent and integrates the key points smoothly. **Crucially: Your entire response MUST be grounded *exclusively* on the information provided in the 'Input Summaries' below. Do NOT add any external knowledge, facts, or details not present in these specific summaries.** **Input Summaries:** *   **Renewable Energy:**     {renewable_energy_result} *   **Electric Vehicles:**     {ev_technology_result} *   **Carbon Capture:**     {carbon_capture_result} **Output Format:** ## Summary of Recent Sustainable Technology Advancements ### Renewable Energy Findings (Based on RenewableEnergyResearcher's findings) [Synthesize and elaborate *only* on the renewable energy input summary provided above.] ### Electric Vehicle Findings (Based on EVResearcher's findings) [Synthesize and elaborate *only* on the EV input summary provided above.] ### Carbon Capture Findings (Based on CarbonCaptureResearcher's findings) [Synthesize and elaborate *only* on the carbon capture input summary provided above.] ### Overall Conclusion [Provide a brief (1-2 sentence) concluding statement that connects *only* the findings presented above.] Output *only* the structured report following this format. Do not include introductory or concluding phrases outside this structure, and strictly adhere to using only the provided input summary content. """,
    description="Combines research findings from parallel agents into a structured, cited report, strictly grounded on provided inputs.",
    # No tools needed for merging
    # No output_key needed here, as its direct response is the final output of the sequence
)

# --- 4. Create the SequentialAgent (Orchestrates the overall flow) ---
# This is the main agent that will be run. It first executes the ParallelAgent
# to populate the state, and then executes the MergerAgent to produce the final output.
sequential_pipeline_agent = SequentialAgent(
    name="ResearchAndSynthesisPipeline",
    # Run parallel research first, then merge
    sub_agents=[parallel_research_agent, merger_agent],
    description="Coordinates parallel research and synthesizes the results."
)

root_agent = sequential_pipeline_agent

此代码定义了一个多智能体系统,用于研究和综合可持续技术进步的信息。它设置了三个LLmAgent实例作为专业研究人员。ResearcherAgent_1专注于可再生能源,ResearcherAgent_2研究电动汽车技术,而ResearcherAgent_3调查碳捕获方法。每个研究人员智能体都配置为使用GEMINI_MODEL和google_search工具。它们被指示简洁地(1-2句话)总结研究结果,并使用output_key将这些总结存储在会话状态中。

随后创建了一个名为ParallelWebResearchAgent的并行智能体来同时运行这三个研究者智能体。这允许研究并行进行,可能节省时间。一旦所有子智能体(研究者)完成并填充了状态,并行智能体就会完成其执行。

接下来,定义了一个合并智能体(也是一个LLmAgent),用于综合研究结果。该智能体以并行研究人员存储在会话状态中的摘要为输入。其指令强调输出必须严格基于提供的输入摘要,禁止添加外部知识。合并智能体旨在将综合发现结构化成一个报告,每个主题都有标题,并有一个简短的总体结论。

最后,创建了一个名为ResearchAndSynthesisPipeline的SequentialAgent智能体来协调整个工作流程。作为主要控制器,该主智能体首先执行ParallelAgent以进行研究。一旦ParallelAgent完成,SequentialAgent随后执行MergerAgent以综合收集到的信息。将sequential_pipeline_agent设置为root_agent,代表运行此多智能体系统的入口点。整个过程旨在高效地从多个来源并行收集信息,然后将其合并成一个单一的、结构化的报告。

概览

内容: 许多智能体工作流程涉及多个子任务,这些任务必须完成才能达到最终目标。纯粹按顺序执行,其中每个任务都等待前一个任务完成,通常效率低下且速度缓慢。当任务依赖于外部I/O操作,如调用不同的API或查询多个数据库时,这种延迟成为了一个显著的瓶颈。如果没有并发执行的机制,总处理时间就是所有单个任务持续时间的总和,这阻碍了系统的整体性能和响应速度。

原因: 并行化模式通过实现独立任务的同步执行,提供了一种标准化的解决方案。它通过识别工作流程中的组件,如工具使用或LLM调用,这些组件不依赖于彼此的即时输出。像LangChain和Google ADK这样的智能体框架提供了内置结构来定义和管理这些并发操作。例如,主进程可以调用多个并行运行的子任务,并在所有子任务完成之前等待,然后再进行下一步。通过同时运行这些独立任务而不是依次运行,这种模式极大地减少了总执行时间。

经验法则: 当工作流包含多个可以同时运行的独立操作时,使用此模式,例如从多个API获取数据、处理不同的数据块或生成多个内容片段以供后续合成。

视觉摘要

image2

图2:并行化设计模式

关键要点

以下是关键要点:

并行化是一种执行独立任务以提升效率的执行模式。 当任务涉及等待外部资源,如API调用时,这尤其有用。 采用并发或并行架构会引入大量的复杂性和成本,影响关键的开发阶段,如设计、调试和系统日志记录。 * 类似LangChain和Google ADK的框架提供了内置支持,用于定义和管理并行执行。 在LangChain表达式语言(LCEL)中,RunnableParallel 是一个用于并行运行多个可执行项的关键结构。 谷歌ADK可以通过LLM驱动的委托来促进并行执行,其中协调器智能体的LLM识别独立的子任务,并通过专门的子智能体触发它们的并发处理。 并行化有助于降低整体延迟,并使智能体系统在处理复杂任务时更加响应迅速。

结论

并行化模式是一种通过并发执行独立子任务来优化计算工作流的方法。这种方法可以降低整体延迟,尤其是在涉及多个模型推理或调用外部服务的复杂操作中。

框架提供了不同的机制来实现这种模式。在LangChain中,使用如RunnableParallel等构造来显式定义和同时执行多个处理链。相比之下,像谷歌智能体开发者工具包(ADK)这样的框架可以通过多智能体委托来实现并行化,其中主协调模型将不同的子任务分配给可以并发操作的专用智能体。

通过将并行处理与顺序(链式)和条件(路由)控制流程相结合,可以构建出复杂、高性能的计算系统,这些系统能够高效地管理各种复杂任务。

参考文献

以下是一些关于并行化模式及相关概念的进一步阅读资源:

  1. LangChain 表达式语言(LCEL)文档(并行处理)
  2. Google智能体开发者工具包(ADK)文档(多智能体系统)
  3. Python asyncio文档