从单轮到多 Agent 协作:我的 AI Agent 开发实战记录
用 Python 构建复杂信息检索系统时,单轮 Agent 碰到多源异构数据就频繁翻车。换成多 Agent 协作架构后,任务成功率从 62% 提到了 91%。
问题是什么
我接了个信息聚合项目:从十几个数据源(SQL 数据库、REST API、PDF 文档、网页)抓信息,整合成结构化报告。
最开始图省事,一个 Agent 包打天下——一个 LLM 实例干所有活:拆解用户意图、选数据源、写查询语句、解析结果、汇总报告。
结果惨不忍睹。数据源超过 5 个,Agent 就开始"人格分裂":它在 SQL 查询步骤像个 DBA,到解析 PDF 时突然忘了刚才查了啥。更糟的是,某个步骤一报错,整条链路直接崩,没有任何容错。
解决思路
我对比了三种方案:
| 方案 | 思路 | 优缺点 |
|---|---|---|
| 单 Agent + 工具链 | 一个 Agent 调用多个 function/tool | 简单但脆弱,上下文污染严重 |
| Pipeline 编排 | 预定义执行管道,每步独立 | 稳定但死板,应付不了动态任务 |
| 多 Agent 协作 | 多个专业 Agent 各司其职,通过消息传递协作 | 复杂度高,但灵活性和鲁棒性最好 |
我选了第三种。核心思路:把原来一个大 Agent 拆成多个小 Agent,每个只干一件事,Agent 之间通过结构化消息通信。
操作步骤
步骤 1:拆解任务,定义 Agent 角色
我把信息聚合任务拆成 4 个专业角色:
1. Planner Agent — 分析用户请求,生成执行计划
2. Query Agent — 只负责从各种数据源获取原始数据
3. Analysis Agent — 对原始数据进行分析和交叉验证
4. Writer Agent — 将分析结果写成结构化报告
每个 Agent 共享同一个 LLM 实例,但 prompt 不同、暴露的工具不同、上下文窗口也隔离。
步骤 2:搭建 Agent 基类和消息协议
通用 Agent 基类和消息协议:
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
class MessageType(Enum):
TASK = "task" # 下发任务
RESULT = "result" # 返回结果
ERROR = "error" # 错误报告
REVISE = "revise" # 要求修正
@dataclass
class Message:
msg_type: MessageType
sender: str
receiver: str
payload: Any
task_id: str = ""
metadata: dict = field(default_factory=dict)
class BaseAgent:
def __init__(self, name: str, llm, system_prompt: str, tools: list = None):
self.name = name
self.llm = llm
self.system_prompt = system_prompt
self.tools = tools or []
self.mailbox: list[Message] = []
self.context: list[dict] = []
def receive(self, msg: Message):
self.mailbox.append(msg)
def process(self) -> Optional[Message]:
if not self.mailbox:
return None
msg = self.mailbox.pop(0)
return self._handle(msg)
def _handle(self, msg: Message) -> Message:
raise NotImplementedError
每个 Agent 维护自己的
context列表,不共享——这是避免"上下文污染"的关键。
步骤 3:实现核心 Agent——Planner 和 Query
Planner Agent 负责把用户请求拆成可执行步骤:
class PlannerAgent(BaseAgent):
def _handle(self, msg: Message) -> Message:
if msg.msg_type != MessageType.TASK:
return Message(MessageType.ERROR, self.name, msg.sender,
f"Unexpected message type: {msg.msg_type}")
user_request = msg.payload
response = self.llm.invoke(
system=self.system_prompt,
messages=[{
"role": "user",
"content": f"请将以下请求拆解为具体的执行步骤:
{user_request}"
}]
)
steps = self._parse_steps(response)
return Message(
MessageType.TASK, self.name, "query_agent",
{"steps": steps, "original_request": user_request},
task_id=msg.task_id
)
Query Agent 对接实际数据源。关键是让它"只取数据,不做分析":
class QueryAgent(BaseAgent):
def __init__(self, name, llm, data_sources: dict):
super().__init__(name, llm, "你是一个数据查询专家,只负责获取数据,不进行分析。")
self.data_sources = data_sources
def _handle(self, msg: Message) -> Message:
steps = msg.payload.get("steps", [])
raw_results = {}
for step in steps:
source_name = step.get("source")
source = self.data_sources.get(source_name)
if not source:
raw_results[source_name] = {"error": f"Unknown source: {source_name}"}
continue
try:
raw_results[source_name] = source.query(step["query"])
except Exception as e:
raw_results[source_name] = {"error": str(e)}
return Message(MessageType.RESULT, self.name, "analysis_agent",
raw_results, task_id=msg.task_id)
try/except包裹每个独立查询,一个数据源挂了不会拖垮整个流程。
步骤 4:实现 Analysis Agent 和 Writer Agent
Analysis Agent 做交叉验证和去重:
class AnalysisAgent(BaseAgent):
def _handle(self, msg: Message) -> Message:
raw_data = msg.payload
analysis = self.llm.invoke(
system="你是一个数据分析师。对比多个数据源的返回结果,找出共识和矛盾。",
messages=[{
"role": "user",
"content": f"请分析以下原始数据:
{json.dumps(raw_data, ensure_ascii=False, indent=2)}"
}]
)
return Message(MessageType.RESULT, self.name, "writer_agent",
{"analysis": analysis, "raw_data": raw_data},
task_id=msg.task_id)
Writer Agent 负责最终输出。它拿到的是分析后的结构化数据,不需要关心数据来源和查询细节:
class WriterAgent(BaseAgent):
def _handle(self, msg: Message) -> Message:
content = msg.payload
report = self.llm.invoke(
system="你是一个技术报告撰写专家。根据分析结果生成结构化报告。",
messages=[{
"role": "user",
"content": f"请根据以下分析生成报告:
{json.dumps(content, ensure_ascii=False, indent=2)}"
}]
)
return Message(MessageType.RESULT, self.name, "__main__",
{"report": report}, task_id=msg.task_id)
步骤 5:编排协作流程
Orchestrator 串联整个流程:
class Orchestrator:
def __init__(self, agents: dict[str, BaseAgent]):
self.agents = agents
def run(self, user_request: str) -> str:
planner = self.agents["planner"]
planner.receive(Message(
MessageType.TASK, "user", "planner", user_request
))
current_msg = planner.process()
if current_msg.msg_type == MessageType.ERROR:
return f"Planning failed: {current_msg.payload}"
query_agent = self.agents["query"]
query_agent.receive(current_msg)
current_msg = query_agent.process()
if current_msg.msg_type == MessageType.ERROR:
return f"Query failed: {current_msg.payload}"
analysis_agent = self.agents["analysis"]
analysis_agent.receive(current_msg)
current_msg = analysis_agent.process()
if current_msg.msg_type == MessageType.ERROR:
return f"Analysis failed: {current_msg.payload}"
writer_agent = self.agents["writer"]
writer_agent.receive(current_msg)
current_msg = writer_agent.process()
if current_msg.msg_type == MessageType.ERROR:
return f"Writing failed: {current_msg.payload}"
return current_msg.payload.get("report", "No report generated")
结果与总结
迁移到多 Agent 架构后,最直接的改变:
踩到的坑:
延伸思考

你可能感兴趣的文章
分类:Android
标签:
评论已关闭!