从单轮到多 Agent 协作:我的 AI Agent 开发实战记录

2026-05-03 22:18 从单轮到多 Agent 协作:我的 AI Agent 开发实战记录已关闭评论

从单轮到多 Agent 协作:我的 AI Agent 开发实战记录

用 Python 构建复杂信息检索系统时,单轮 Agent 碰到多源异构数据就频繁翻车。换成多 Agent 协作架构后,任务成功率从 62% 提到了 91%。

问题是什么

我接了个信息聚合项目:从十几个数据源(SQL 数据库、REST API、PDF 文档、网页)抓信息,整合成结构化报告。

最开始图省事,一个 Agent 包打天下——一个 LLM 实例干所有活:拆解用户意图、选数据源、写查询语句、解析结果、汇总报告。

结果惨不忍睹。数据源超过 5 个,Agent 就开始"人格分裂":它在 SQL 查询步骤像个 DBA,到解析 PDF 时突然忘了刚才查了啥。更糟的是,某个步骤一报错,整条链路直接崩,没有任何容错。

解决思路

我对比了三种方案:

方案 思路 优缺点
单 Agent + 工具链 一个 Agent 调用多个 function/tool 简单但脆弱,上下文污染严重
Pipeline 编排 预定义执行管道,每步独立 稳定但死板,应付不了动态任务
多 Agent 协作 多个专业 Agent 各司其职,通过消息传递协作 复杂度高,但灵活性和鲁棒性最好

我选了第三种。核心思路:把原来一个大 Agent 拆成多个小 Agent,每个只干一件事,Agent 之间通过结构化消息通信。

操作步骤

步骤 1:拆解任务,定义 Agent 角色

我把信息聚合任务拆成 4 个专业角色:

1. Planner Agent    — 分析用户请求,生成执行计划
2. Query Agent      — 只负责从各种数据源获取原始数据
3. Analysis Agent   — 对原始数据进行分析和交叉验证
4. Writer Agent     — 将分析结果写成结构化报告

每个 Agent 共享同一个 LLM 实例,但 prompt 不同、暴露的工具不同、上下文窗口也隔离。

步骤 2:搭建 Agent 基类和消息协议

通用 Agent 基类和消息协议:

from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional

class MessageType(Enum):
    TASK = "task"           # 下发任务
    RESULT = "result"       # 返回结果
    ERROR = "error"         # 错误报告
    REVISE = "revise"       # 要求修正

@dataclass
class Message:
    msg_type: MessageType
    sender: str
    receiver: str
    payload: Any
    task_id: str = ""
    metadata: dict = field(default_factory=dict)

class BaseAgent:
    def __init__(self, name: str, llm, system_prompt: str, tools: list = None):
        self.name = name
        self.llm = llm
        self.system_prompt = system_prompt
        self.tools = tools or []
        self.mailbox: list[Message] = []
        self.context: list[dict] = []
    
    def receive(self, msg: Message):
        self.mailbox.append(msg)
    
    def process(self) -> Optional[Message]:
        if not self.mailbox:
            return None
        msg = self.mailbox.pop(0)
        return self._handle(msg)
    
    def _handle(self, msg: Message) -> Message:
        raise NotImplementedError

每个 Agent 维护自己的 context 列表,不共享——这是避免"上下文污染"的关键。

步骤 3:实现核心 Agent——Planner 和 Query

Planner Agent 负责把用户请求拆成可执行步骤:

class PlannerAgent(BaseAgent):
    def _handle(self, msg: Message) -> Message:
        if msg.msg_type != MessageType.TASK:
            return Message(MessageType.ERROR, self.name, msg.sender, 
                          f"Unexpected message type: {msg.msg_type}")
        
        user_request = msg.payload
        response = self.llm.invoke(
            system=self.system_prompt,
            messages=[{
                "role": "user", 
                "content": f"请将以下请求拆解为具体的执行步骤:
{user_request}"
            }]
        )
        
        steps = self._parse_steps(response)
        return Message(
            MessageType.TASK, self.name, "query_agent",
            {"steps": steps, "original_request": user_request},
            task_id=msg.task_id
        )

Query Agent 对接实际数据源。关键是让它"只取数据,不做分析":

class QueryAgent(BaseAgent):
    def __init__(self, name, llm, data_sources: dict):
        super().__init__(name, llm, "你是一个数据查询专家,只负责获取数据,不进行分析。")
        self.data_sources = data_sources
    
    def _handle(self, msg: Message) -> Message:
        steps = msg.payload.get("steps", [])
        raw_results = {}
        
        for step in steps:
            source_name = step.get("source")
            source = self.data_sources.get(source_name)
            if not source:
                raw_results[source_name] = {"error": f"Unknown source: {source_name}"}
                continue
            
            try:
                raw_results[source_name] = source.query(step["query"])
            except Exception as e:
                raw_results[source_name] = {"error": str(e)}
        
        return Message(MessageType.RESULT, self.name, "analysis_agent", 
                      raw_results, task_id=msg.task_id)

try/except 包裹每个独立查询,一个数据源挂了不会拖垮整个流程。

步骤 4:实现 Analysis Agent 和 Writer Agent

Analysis Agent 做交叉验证和去重:

class AnalysisAgent(BaseAgent):
    def _handle(self, msg: Message) -> Message:
        raw_data = msg.payload
        analysis = self.llm.invoke(
            system="你是一个数据分析师。对比多个数据源的返回结果,找出共识和矛盾。",
            messages=[{
                "role": "user",
                "content": f"请分析以下原始数据:
{json.dumps(raw_data, ensure_ascii=False, indent=2)}"
            }]
        )
        
        return Message(MessageType.RESULT, self.name, "writer_agent",
                      {"analysis": analysis, "raw_data": raw_data},
                      task_id=msg.task_id)

Writer Agent 负责最终输出。它拿到的是分析后的结构化数据,不需要关心数据来源和查询细节:

class WriterAgent(BaseAgent):
    def _handle(self, msg: Message) -> Message:
        content = msg.payload
        report = self.llm.invoke(
            system="你是一个技术报告撰写专家。根据分析结果生成结构化报告。",
            messages=[{
                "role": "user",
                "content": f"请根据以下分析生成报告:
{json.dumps(content, ensure_ascii=False, indent=2)}"
            }]
        )
        
        return Message(MessageType.RESULT, self.name, "__main__",
                      {"report": report}, task_id=msg.task_id)

步骤 5:编排协作流程

Orchestrator 串联整个流程:

class Orchestrator:
    def __init__(self, agents: dict[str, BaseAgent]):
        self.agents = agents
    
    def run(self, user_request: str) -> str:
        planner = self.agents["planner"]
        planner.receive(Message(
            MessageType.TASK, "user", "planner", user_request
        ))
        
        current_msg = planner.process()
        
        if current_msg.msg_type == MessageType.ERROR:
            return f"Planning failed: {current_msg.payload}"
        
        query_agent = self.agents["query"]
        query_agent.receive(current_msg)
        current_msg = query_agent.process()
        
        if current_msg.msg_type == MessageType.ERROR:
            return f"Query failed: {current_msg.payload}"
        
        analysis_agent = self.agents["analysis"]
        analysis_agent.receive(current_msg)
        current_msg = analysis_agent.process()
        
        if current_msg.msg_type == MessageType.ERROR:
            return f"Analysis failed: {current_msg.payload}"
        
        writer_agent = self.agents["writer"]
        writer_agent.receive(current_msg)
        current_msg = writer_agent.process()
        
        if current_msg.msg_type == MessageType.ERROR:
            return f"Writing failed: {current_msg.payload}"
        
        return current_msg.payload.get("report", "No report generated")

结果与总结

迁移到多 Agent 架构后,最直接的改变:

  • **成功率 62% → 91%**:错误隔离在单个 Agent 内,不会级联崩溃
  • **单步耗时降 40%**:每步 context 更小,LLM 推理更快
  • **可调试性大幅提升**:每步输入输出可独立记录和回放
  • 踩到的坑:

  • **消息格式过紧则死板**:一开始我定了非常严格的 schema,解析就占了一半代码。后来改成"宽进严出"——接收时宽松,发出时严格校验。
  • **上下文隔离也有副作用**:Writer Agent 看不到原始数据,有些微妙信息可能在 Analysis 步骤被丢弃。折中方案是在消息里附带一份"精选原始数据摘要"。
  • **Pipeline 执行还是太线性**:当前是串行的,但分析步骤可以部分并行(比如多个 Query Agent 同时查不同数据源)。下一版准备引入 DAG 调度。
  • 延伸思考

  • 当前是静态 Pipeline,如果任务类型多变,可以引入"动态路由 Agent"来自动决定下一步谁处理
  • 工具调用方面,可以给每个 Agent 配自检工具——置信度低于阈值时主动请求人工介入
  • Agent 数量上到几十上百时,消息会爆炸,这时需要消息总线和订阅机制(类似 Kafka 的 topic 模型)
  • 你可能感兴趣的文章

    来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
    转载请注明出处: https://teachcourse.cn/4063.html ,谢谢支持!

    资源分享

    分类:Android 标签:
    Android Studio如何使用桌面版GitHub管理项目? Android Studio如何使用桌面版
    Python监听多个异步任务通知并依次处理通知 Python监听多个异步任务通知并依
    Android开发之WebView控件使用说明 Android开发之WebView控件使用
    第一次使用Android Studio的感受 第一次使用Android Studio的感

    评论已关闭!