AI Agent 应用开发:从单轮到多Agent协作实战

2026-05-04 13:57 AI Agent 应用开发:从单轮到多Agent协作实战已关闭评论

AI Agent 应用开发:从单轮到多Agent协作实战

从单轮对话到多Agent协作,不是功能叠加,而是架构思维的彻底重构。我花了四周时间,把一个简单的"AI问答"原型重构成了能自主协作的多Agent系统,单Agent的墙和多Agent的坑全踩了一遍。这篇就是我从0到1的完整路径。

一、起点:一个"能回答问题"的Agent

第一版Agent特别简单:接收用户问题 → 调用LLM → 返回答案。

from openai import OpenAI

client = OpenAI()

def simple_agent(prompt: str) -> str:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

这个版本跑通只花了10分钟。结果上线第一天就翻车了:

用户问:"帮我分析一下上个月的销售数据,然后发一封邮件给团队,再在Slack上通知大家。"

一个对话接口被迫同时干三件事:分析数据、生成邮件、发通知。Prompt越来越臃肿,逻辑全耦合在一起,改一处可能影响全部。

这不是Agent,这是带记忆功能的聊天框。

二、第一轮重构:工具调用(Function Calling)

Agent得有"手"——不能光说不练。OpenAI Function Calling 让Agent能调用外部工具了。

tools = [
    {
        "type": "function",
        "function": {
            "name": "query_sales_data",
            "description": "查询销售数据",
            "parameters": {
                "type": "object",
                "properties": {
                    "start_date": {"type": "string"},
                    "end_date": {"type": "string"}
                }
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "send_email",
            "description": "发送邮件",
            "parameters": {
                "type": "object",
                "properties": {
                    "to": {"type": "string"},
                    "subject": {"type": "string"},
                    "body": {"type": "string"}
                }
            }
        }
    }
]

def agent_with_tools(user_message: str) -> str:
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        response = client.chat.completions.create(
            model="gpt-4",
            messages=messages,
            tools=tools
        )
        
        if response.choices[0].finish_reason == "stop":
            return response.choices[0].message.content
        
        # 执行工具调用
        for tool_call in response.choices[0].message.tool_calls:
            result = execute_tool(tool_call)
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": str(result)
            })

这个版本让Agent真正有了行动力。但新的问题很快又冒出来了:

  • 上下文窗口膨胀:每次工具调用结果都塞进对话历史,3轮交互后Token消耗暴涨
  • 错误级联:一个工具调用失败,整个ReAct循环直接卡死
  • 缺乏分工:同一个Agent既要推理又要调工具还要做格式化,Prompt复杂度指数级上升

三、核心改造:从单Agent到多Agent

翻了篇LangGraph的论文后,我决定把系统拆成多个专职Agent,每个只干一件事,通过一个Orchestrator来协调。

3.1 定义Agent基类

from abc import ABC, abstractmethod
from typing import Dict, Any

class BaseAgent(ABC):
    def __init__(self, name: str, system_prompt: str):
        self.name = name
        self.system_prompt = system_prompt
        self.context = []
    
    @abstractmethod
    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        pass
    
    def _call_llm(self, messages: list) -> str:
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": self.system_prompt},
                *messages
            ]
        )
        return response.choices[0].message.content

3.2 实现专职Agent

我拆了三个Agent:

分析Agent — 只负责数据查询和汇总:

class AnalystAgent(BaseAgent):
    def __init__(self):
        super().__init__(
            name="analyst",
            system_prompt="""你是一个数据分析师。
你的职责:
1. 理解用户的数据需求
2. 调用 query_sales_data 获取数据
3. 返回结构化的分析结果(JSON格式)

注意:只做数据分析,不要生成邮件或通知内容。"""
        )
    
    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        query = input_data.get("query", "")
        data = self._query_database(query)
        analysis = self._call_llm([
            {"role": "user", "content": f"分析以下数据:\n{data}"}
        ])
        return {"analysis_result": analysis, "raw_data": data}

邮件Agent — 只负责邮件创作和发送:

class EmailAgent(BaseAgent):
    def __init__(self):
        super().__init__(
            name="email_writer",
            system_prompt="""你是一个邮件撰写专家。
根据收到的分析结果,撰写专业的邮件正文。
用 markdown 格式输出。"""
        )
    
    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        analysis = input_data.get("analysis", "")
        email_body = self._call_llm([
            {"role": "user", "content": f"根据以下分析结果写邮件:\n{analysis}"}
        ])
        send_email(input_data.get("to", ""), email_body)
        return {"email_status": "sent", "body": email_body}

通知Agent — 负责多通道消息分发:

class NotifierAgent(BaseAgent):
    def __init__(self):
        super().__init__(
            name="notifier",
            system_prompt="""你是一个通知分发员。
根据消息内容和紧急程度,选择合适渠道通知用户。
支持:Slack、企业微信、短信。"""
        )
    
    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        message = input_data.get("message", "")
        channels = input_data.get("channels", ["slack"])
        results = {}
        for channel in channels:
            results[channel] = self._send_notification(channel, message)
        return {"notification_results": results}

3.3 Orchestrator:大脑中枢

Orchestrator 是系统的核心,负责解析用户意图,编排Agent执行顺序,处理错误和超时。

class Orchestrator:
    def __init__(self):
        self.agents = {
            "analyst": AnalystAgent(),
            "email": EmailAgent(),
            "notifier": NotifierAgent()
        }
        self.workflow_registry = {}
    
    def register_workflow(self, name: str, steps: list):
        """注册工作流,例如:
        [
            {"agent": "analyst", "input_key": "query"},
            {"agent": "email", "input_key": "analysis_result"},
            {"agent": "notifier", "input_key": "message"}
        ]
        """
        self.workflow_registry[name] = steps
    
    async def execute(self, user_request: str) -> Dict[str, Any]:
        # 1. 意图识别
        intent = await self._classify_intent(user_request)
        
        # 2. 获取工作流
        workflow = self.workflow_registry.get(intent["workflow"])
        if not workflow:
            raise ValueError(f"未知的工作流: {intent['workflow']}")
        
        # 3. 按顺序执行,数据传递
        context = intent["params"]
        results = {}
        
        for step in workflow:
            agent = self.agents[step["agent"]]
            
            # 准备输入
            input_key = step["input_key"]
            agent_input = {
                input_key: context.get(input_key)
            }
            
            # 执行Agent(带超时和重试)
            try:
                result = await asyncio.wait_for(
                    self._run_agent_with_retry(agent, agent_input),
                    timeout=30.0
                )
            except asyncio.TimeoutError:
                result = {"error": f"Agent {agent.name} 超时"}
            
            results[agent.name] = result
            context.update(result)
        
        return results

注意: Orchestrator 自己不调LLM,它只是一个"路由+编排"层。保持轻量,系统才好维护。

四、实践中踩过的坑

坑1:Agent之间数据格式不统一

Analyst 返回 JSON,Email 期望 Markdown,Notifier 只认纯文本。每次传数据都得做格式转换。

我的解决办法: 统一 Agent 的输入输出为 Dict[str, Any],在 Orchestrator 中加了一个轻量的 Transformer 层:

class DataTransformer:
    transforms = {
        "analyst_to_email": lambda d: {
            "analysis": d["analysis_result"],
            "to": d.get("recipient", "team@company.com")
        },
        "email_to_notifier": lambda d: {
            "message": f"邮件已发送: {d['body'][:100]}...",
            "channels": ["slack"]
        }
    }

坑2:某个Agent挂了,整个流程中断

第三个Agent调Slack API超时,前两个Agent的成果全白费了。

我的解决办法: 引入检查点和部分完成策略。

class Checkpoint:
    def __init__(self):
        self.storage = {}
    
    def save(self, workflow_id: str, step: str, data: Dict):
        self.storage[f"{workflow_id}:{step}"] = {
            "data": data,
            "timestamp": time.time()
        }
    
    def get_partial_result(self, workflow_id: str):
        return {
            k: v for k, v in self.storage.items()
            if k.startswith(workflow_id)
        }

就算通知Agent失败,分析和邮件结果也不会丢。Orchestrator可以重试或跳过失败的步骤。

坑3:Token消耗爆炸

每个Agent都带着完整的对话历史,4个Agent = 4倍的上下文开销。

我的解决办法: 每个Agent只带与自身职责相关的摘要上下文,不共享完整历史。

class ContextManager:
    @staticmethod
    def summarize(agent_name: str, full_context: str) -> str:
        summary_prompt = f"你是{agent_name},请从以下对话中提取与你相关的信息,50字以内:"
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "user", "content": f"{summary_prompt}\n{full_context}"}
            ]
        )
        return response.choices[0].message.content

实践下来,用 gpt-3.5-turbo 做摘要,gpt-4 做核心推理,质量不降的前提下,Token成本降了大概60%。

五、效果对比

我用同一组测试集跑了50个Case,单Agent版和多Agent版对比:

维度 单Agent 多Agent协作
任务完成率 72% 94%
平均响应时间 8.3s 12.7s
Token消耗/任务 ~12K ~9.5K
单点故障影响面 全系统 单个Agent
添加新能力成本 改Prompt,风险高 加Agent,风险低

多Agent在完成率和Token成本上明显胜出。响应时间虽然稍长(Agent间通信开销),但完成率从72%提到94%,多等几秒完全值。

六、延伸思考

  1. Agent粒度怎么定? 我的经验:一个Agent的职责说明书超过200字,说明它太"胖"了,该继续拆。
  2. 循环依赖怎么处理? Agent A调Agent B,Agent B又回来调Agent A。我的做法是给Orchestrator加一个最大深度限制(目前设了5层),超了直接报错,避免隐形成本失控。
  3. 要不要引入Agent间辩论? 我试过两个Analyst Agent互相校验结果,准确率确实提升了,但成本也上去了,性价比不高。生产环境建议只在关键决策点(比如支付、审批)做多Agent交叉验证。
  4. MCP协议值得关注。 标准化Agent工具调用协议能避免自定义Tool Call的耦合问题,我下个版本正往这个方向迁移。

以上是我从单Agent到多Agent协作的完整实战记录。代码不复杂,核心思想就两个字:分治——让每个Agent只做它擅长的事,通过一个轻量的编排层串起来。如果你也在做Agent开发,建议从"拆"开始,别急着"合"。

你可能感兴趣的文章

来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
转载请注明出处: https://teachcourse.cn/4083.html ,谢谢支持!

资源分享

分类:Android 标签:
Python定义公共方法、私有方法详细示例 Python定义公共方法、私有方法详
008- JavaScript 如何实现动态加载文章内容或生成内容 008- JavaScript 如何实现动态
java提供的容器类 java提供的容器类
关于如何解决“NoClassDefFoundError”错误的问题? 关于如何解决“NoClassDefFo

评论已关闭!