AI Agent 应用开发:从单轮到多Agent协作实战
从单轮对话到多Agent协作,不是功能叠加,而是架构思维的彻底重构。我花了四周时间,把一个简单的"AI问答"原型重构成了能自主协作的多Agent系统,单Agent的墙和多Agent的坑全踩了一遍。这篇就是我从0到1的完整路径。
一、起点:一个"能回答问题"的Agent
第一版Agent特别简单:接收用户问题 → 调用LLM → 返回答案。
from openai import OpenAI
client = OpenAI()
def simple_agent(prompt: str) -> str:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
这个版本跑通只花了10分钟。结果上线第一天就翻车了:
用户问:"帮我分析一下上个月的销售数据,然后发一封邮件给团队,再在Slack上通知大家。"
一个对话接口被迫同时干三件事:分析数据、生成邮件、发通知。Prompt越来越臃肿,逻辑全耦合在一起,改一处可能影响全部。
这不是Agent,这是带记忆功能的聊天框。
二、第一轮重构:工具调用(Function Calling)
Agent得有"手"——不能光说不练。OpenAI Function Calling 让Agent能调用外部工具了。
tools = [
{
"type": "function",
"function": {
"name": "query_sales_data",
"description": "查询销售数据",
"parameters": {
"type": "object",
"properties": {
"start_date": {"type": "string"},
"end_date": {"type": "string"}
}
}
}
},
{
"type": "function",
"function": {
"name": "send_email",
"description": "发送邮件",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string"},
"subject": {"type": "string"},
"body": {"type": "string"}
}
}
}
}
]
def agent_with_tools(user_message: str) -> str:
messages = [{"role": "user", "content": user_message}]
while True:
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools
)
if response.choices[0].finish_reason == "stop":
return response.choices[0].message.content
# 执行工具调用
for tool_call in response.choices[0].message.tool_calls:
result = execute_tool(tool_call)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})
这个版本让Agent真正有了行动力。但新的问题很快又冒出来了:
- 上下文窗口膨胀:每次工具调用结果都塞进对话历史,3轮交互后Token消耗暴涨
- 错误级联:一个工具调用失败,整个ReAct循环直接卡死
- 缺乏分工:同一个Agent既要推理又要调工具还要做格式化,Prompt复杂度指数级上升
三、核心改造:从单Agent到多Agent
翻了篇LangGraph的论文后,我决定把系统拆成多个专职Agent,每个只干一件事,通过一个Orchestrator来协调。
3.1 定义Agent基类
from abc import ABC, abstractmethod
from typing import Dict, Any
class BaseAgent(ABC):
def __init__(self, name: str, system_prompt: str):
self.name = name
self.system_prompt = system_prompt
self.context = []
@abstractmethod
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
pass
def _call_llm(self, messages: list) -> str:
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": self.system_prompt},
*messages
]
)
return response.choices[0].message.content
3.2 实现专职Agent
我拆了三个Agent:
分析Agent — 只负责数据查询和汇总:
class AnalystAgent(BaseAgent):
def __init__(self):
super().__init__(
name="analyst",
system_prompt="""你是一个数据分析师。
你的职责:
1. 理解用户的数据需求
2. 调用 query_sales_data 获取数据
3. 返回结构化的分析结果(JSON格式)
注意:只做数据分析,不要生成邮件或通知内容。"""
)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
query = input_data.get("query", "")
data = self._query_database(query)
analysis = self._call_llm([
{"role": "user", "content": f"分析以下数据:\n{data}"}
])
return {"analysis_result": analysis, "raw_data": data}
邮件Agent — 只负责邮件创作和发送:
class EmailAgent(BaseAgent):
def __init__(self):
super().__init__(
name="email_writer",
system_prompt="""你是一个邮件撰写专家。
根据收到的分析结果,撰写专业的邮件正文。
用 markdown 格式输出。"""
)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
analysis = input_data.get("analysis", "")
email_body = self._call_llm([
{"role": "user", "content": f"根据以下分析结果写邮件:\n{analysis}"}
])
send_email(input_data.get("to", ""), email_body)
return {"email_status": "sent", "body": email_body}
通知Agent — 负责多通道消息分发:
class NotifierAgent(BaseAgent):
def __init__(self):
super().__init__(
name="notifier",
system_prompt="""你是一个通知分发员。
根据消息内容和紧急程度,选择合适渠道通知用户。
支持:Slack、企业微信、短信。"""
)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
message = input_data.get("message", "")
channels = input_data.get("channels", ["slack"])
results = {}
for channel in channels:
results[channel] = self._send_notification(channel, message)
return {"notification_results": results}
3.3 Orchestrator:大脑中枢
Orchestrator 是系统的核心,负责解析用户意图,编排Agent执行顺序,处理错误和超时。
class Orchestrator:
def __init__(self):
self.agents = {
"analyst": AnalystAgent(),
"email": EmailAgent(),
"notifier": NotifierAgent()
}
self.workflow_registry = {}
def register_workflow(self, name: str, steps: list):
"""注册工作流,例如:
[
{"agent": "analyst", "input_key": "query"},
{"agent": "email", "input_key": "analysis_result"},
{"agent": "notifier", "input_key": "message"}
]
"""
self.workflow_registry[name] = steps
async def execute(self, user_request: str) -> Dict[str, Any]:
# 1. 意图识别
intent = await self._classify_intent(user_request)
# 2. 获取工作流
workflow = self.workflow_registry.get(intent["workflow"])
if not workflow:
raise ValueError(f"未知的工作流: {intent['workflow']}")
# 3. 按顺序执行,数据传递
context = intent["params"]
results = {}
for step in workflow:
agent = self.agents[step["agent"]]
# 准备输入
input_key = step["input_key"]
agent_input = {
input_key: context.get(input_key)
}
# 执行Agent(带超时和重试)
try:
result = await asyncio.wait_for(
self._run_agent_with_retry(agent, agent_input),
timeout=30.0
)
except asyncio.TimeoutError:
result = {"error": f"Agent {agent.name} 超时"}
results[agent.name] = result
context.update(result)
return results
注意: Orchestrator 自己不调LLM,它只是一个"路由+编排"层。保持轻量,系统才好维护。
四、实践中踩过的坑
坑1:Agent之间数据格式不统一
Analyst 返回 JSON,Email 期望 Markdown,Notifier 只认纯文本。每次传数据都得做格式转换。
我的解决办法: 统一 Agent 的输入输出为 Dict[str, Any],在 Orchestrator 中加了一个轻量的 Transformer 层:
class DataTransformer:
transforms = {
"analyst_to_email": lambda d: {
"analysis": d["analysis_result"],
"to": d.get("recipient", "team@company.com")
},
"email_to_notifier": lambda d: {
"message": f"邮件已发送: {d['body'][:100]}...",
"channels": ["slack"]
}
}
坑2:某个Agent挂了,整个流程中断
第三个Agent调Slack API超时,前两个Agent的成果全白费了。
我的解决办法: 引入检查点和部分完成策略。
class Checkpoint:
def __init__(self):
self.storage = {}
def save(self, workflow_id: str, step: str, data: Dict):
self.storage[f"{workflow_id}:{step}"] = {
"data": data,
"timestamp": time.time()
}
def get_partial_result(self, workflow_id: str):
return {
k: v for k, v in self.storage.items()
if k.startswith(workflow_id)
}
就算通知Agent失败,分析和邮件结果也不会丢。Orchestrator可以重试或跳过失败的步骤。
坑3:Token消耗爆炸
每个Agent都带着完整的对话历史,4个Agent = 4倍的上下文开销。
我的解决办法: 每个Agent只带与自身职责相关的摘要上下文,不共享完整历史。
class ContextManager:
@staticmethod
def summarize(agent_name: str, full_context: str) -> str:
summary_prompt = f"你是{agent_name},请从以下对话中提取与你相关的信息,50字以内:"
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": f"{summary_prompt}\n{full_context}"}
]
)
return response.choices[0].message.content
实践下来,用 gpt-3.5-turbo 做摘要,gpt-4 做核心推理,质量不降的前提下,Token成本降了大概60%。
五、效果对比
我用同一组测试集跑了50个Case,单Agent版和多Agent版对比:
| 维度 | 单Agent | 多Agent协作 |
|---|---|---|
| 任务完成率 | 72% | 94% |
| 平均响应时间 | 8.3s | 12.7s |
| Token消耗/任务 | ~12K | ~9.5K |
| 单点故障影响面 | 全系统 | 单个Agent |
| 添加新能力成本 | 改Prompt,风险高 | 加Agent,风险低 |
多Agent在完成率和Token成本上明显胜出。响应时间虽然稍长(Agent间通信开销),但完成率从72%提到94%,多等几秒完全值。
六、延伸思考
- Agent粒度怎么定? 我的经验:一个Agent的职责说明书超过200字,说明它太"胖"了,该继续拆。
- 循环依赖怎么处理? Agent A调Agent B,Agent B又回来调Agent A。我的做法是给Orchestrator加一个最大深度限制(目前设了5层),超了直接报错,避免隐形成本失控。
- 要不要引入Agent间辩论? 我试过两个Analyst Agent互相校验结果,准确率确实提升了,但成本也上去了,性价比不高。生产环境建议只在关键决策点(比如支付、审批)做多Agent交叉验证。
- MCP协议值得关注。 标准化Agent工具调用协议能避免自定义Tool Call的耦合问题,我下个版本正往这个方向迁移。
以上是我从单Agent到多Agent协作的完整实战记录。代码不复杂,核心思想就两个字:分治——让每个Agent只做它擅长的事,通过一个轻量的编排层串起来。如果你也在做Agent开发,建议从"拆"开始,别急着"合"。

评论已关闭!