AI Agent 开发实战:从零构建智能工作流

2026-05-03 20:35 AI Agent 开发实战:从零构建智能工作流已关闭评论

AI Agent 开发实战:从零构建智能工作流

花了一个月用 Python 从零搭了一套 AI Agent 框架,跑通后把日常 80% 的重复工作都交给了它。


问题是什么

今年年初我接了个活儿:帮一个内容团队搭建自动化发布流水线。需求听起来简单——每天从 RSS 抓取文章 → AI 改写 → 发布到 WordPress 和公众号。

试了一圈现成的平台(n8n、Dify、Coze),发现都不太对劲。要么定制能力不够——公众号的排版引擎各家都不一样,要么 API 调用成本偏高——按次计费,一天几百次跑下来比人工还贵。

更重要的是,我希望 Agent 能记住昨天的发布情况、根据阅读量调整选题策略。这类状态管理在低代码平台上做起来非常别扭。

最后决定:自己写一个 Agent 框架。


解决思路

市面上主流的 Agent 架构有三种:

方案 代表 优点 缺点
纯 LLM + Function Calling OpenAI Assistants API 开发快,基础设施完善 vendor lock-in,调试困难
编排框架 LangChain / CrewAI 生态丰富,组件多 抽象层太厚,出了问题不好追
自建循环 自己写 while True 完全可控,零依赖 要自己处理所有边缘情况

我选了第三种——自建循环。理由很简单:

我的场景足够具体,不需要通用框架的臃肿抽象。框架的"黑盒"特性在调试时让人抓狂——出问题了分不清是 prompt 没写好还是框架有 bug。而且自建循环让我能精确控制 token 消耗,实测比 LangChain 方案省了 40% 的 tokens。


操作步骤

步骤1:定义 Agent 的核心循环

Agent 的本质就是一个循环:思考 → 行动 → 观察 → 再思考

# agent_core.py
import json
import time
from dataclasses import dataclass, field
from typing import Any

@dataclass
class AgentContext:
    """Agent 的上下文状态"""
    system_prompt: str
    messages: list = field(default_factory=list)
    max_steps: int = 10
    step: int = 0

class Agent:
    def __init__(self, llm, tools: list):
        self.llm = llm          # LLM 客户端
        self.tools = tools       # 工具列表
        self.context = None
    
    def run(self, task: str, context: AgentContext = None) -> str:
        """主循环:思考→行动→观察"""
        if context is None:
            context = AgentContext(
                system_prompt=self._build_system_prompt()
            )
        self.context = context
        
        # 把用户任务加入对话
        self.context.messages.append({"role": "user", "content": task})
        
        while self.context.step < self.context.max_steps:
            self.context.step += 1
            print(f"[Step {self.context.step}/{self.context.max_steps}]")
            
            # 思考:调用 LLM
            response = self.llm.chat(
                system=self.context.system_prompt,
                messages=self.context.messages,
                tools=[t.to_schema() for t in self.tools]
            )
            
            # 如果 LLM 没有调用工具,直接返回结果
            if not response.tool_calls:
                final = response.content
                self.context.messages.append(
                    {"role": "assistant", "content": final}
                )
                return final
            
            # 行动:执行工具调用
            self.context.messages.append(response.to_message())
            for tool_call in response.tool_calls:
                tool_name = tool_call.function.name
                tool_args = json.loads(tool_call.function.arguments)
                
                print(f"  → 调用工具: {tool_name}({tool_args})")
                
                # 执行工具
                tool_result = self._execute_tool(tool_name, tool_args)
                
                # 观察:把结果附加到对话
                self.context.messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": str(tool_result)
                })
        
        raise TimeoutError(f"Agent 达到最大步骤 {self.context.max_steps}")
    
    def _execute_tool(self, name: str, args: dict) -> Any:
        for tool in self.tools:
            if tool.name == name:
                return tool.fn(**args)
        raise ValueError(f"未知工具: {name}")
    
    def _build_system_prompt(self) -> str:
        return """你是一个智能工作流助手。你可以使用工具来完成任务。
每次调用工具后,根据返回结果决定下一步行动。
当任务完成时,用中文给出总结。"""

这里的关键设计是 AgentContextAgent 分离——这样可以让多个 Agent 共享同一个 LLM 实例,但各自维护独立的上下文状态。


步骤2:设计工具系统

工具是 Agent 的"手脚"。我设计了一个轻量的工具注册机制:

# tools/base.py
from dataclasses import dataclass
from typing import Callable, Any

@dataclass
class Tool:
    name: str
    description: str
    parameters: dict       # JSON Schema 格式
    fn: Callable           # 实际执行的函数
    
    def to_schema(self) -> dict:
        """转换成 OpenAI Function Calling 格式"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.parameters
            }
        }

# tools/weibo.py — 示例:微博采集工具
import feedparser

def fetch_weibo_hotlist() -> list[dict]:
    """抓取微博热搜"""
    url = "https://weibo.com/ajax/side/hotSearch"
    # 实际实现用 requests
    data = _fetch_json(url)
    return [
        {"title": item["word"], "hot": item["raw_hot"]}
        for item in data["data"]["realtime"]
    ]

weibo_tool = Tool(
    name="fetch_weibo_hotlist",
    description="获取微博实时热搜榜单",
    parameters={"type": "object", "properties": {}},
    fn=fetch_weibo_hotlist
)

我还写了个 @tool 装饰器,省得每次手动定义 Tool 对象:

# tools/registry.py
from functools import wraps
from typing import get_type_hints
import inspect

_registry: dict[str, Tool] = {}

def tool(name: str = None, description: str = None):
    def decorator(func):
        nonlocal name, description
        t_name = name or func.__name__
        t_desc = description or func.__doc__ or ""
        
        # 从函数签名自动生成 JSON Schema
        sig = inspect.signature(func)
        hints = get_type_hints(func)
        properties = {}
        required = []
        
        for param_name, param in sig.parameters.items():
            if param_name == "return":
                continue
            ptype = hints.get(param_name, str)
            properties[param_name] = {"type": _type_to_str(ptype)}
            if param.default is inspect.Parameter.empty:
                required.append(param_name)
        
        parameters = {
            "type": "object",
            "properties": properties
        }
        if required:
            parameters["required"] = required
        
        _registry[t_name] = Tool(
            name=t_name,
            description=t_desc,
            parameters=parameters,
            fn=func
        )
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        return wrapper
    return decorator

def _type_to_str(t: type) -> str:
    mapping = {str: "string", int: "integer", float: "number", bool: "boolean", list: "array", dict: "object"}
    return mapping.get(t, "string")

# 使用示例
@tool(description="计算两个数的和")
def add(a: int, b: int) -> int:
    return a + b

这个装饰器的好处是——函数的类型注解就是工具的 schema,不用写两份。


步骤3:对接 LLM

LLM 这块我封装了一个适配器层,方便切换不同的模型提供方:

# llm/adapter.py
from abc import ABC, abstractmethod
from dataclasses import dataclass

@dataclass
class LLMResponse:
    content: str
    tool_calls: list | None = None
    
    def to_message(self) -> dict:
        msg = {"role": "assistant", "content": self.content}
        if self.tool_calls:
            msg["tool_calls"] = [
                {
                    "id": tc.id,
                    "type": "function",
                    "function": {
                        "name": tc.function.name,
                        "arguments": tc.function.arguments
                    }
                }
                for tc in self.tool_calls
            ]
        return msg

class LLM(ABC):
    @abstractmethod
    def chat(self, system: str, messages: list, tools: list = None) -> LLMResponse:
        ...

# llm/openai_adapter.py
from openai import OpenAI

class OpenAILLM(LLM):
    def __init__(self, model: str = "gpt-4o", api_key: str = None):
        self.client = OpenAI(api_key=api_key)
        self.model = model
    
    def chat(self, system: str, messages: list, tools: list = None) -> LLMResponse:
        kwargs = {
            "model": self.model,
            "messages": [{"role": "system", "content": system}] + messages
        }
        if tools:
            kwargs["tools"] = tools
            kwargs["tool_choice"] = "auto"
        
        resp = self.client.chat.completions.create(**kwargs)
        msg = resp.choices[0].message
        
        return LLMResponse(
            content=msg.content or "",
            tool_calls=msg.tool_calls
        )
# llm/deepseek_adapter.py
from openai import OpenAI

class DeepSeekLLM(LLM):
    """DeepSeek 兼容 OpenAI SDK"""
    def __init__(self, model: str = "deepseek-chat", api_key: str = None):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com"
        )
        self.model = model
    
    def chat(self, system: str, messages: list, tools: list = None) -> LLMResponse:
        kwargs = {
            "model": self.model,
            "messages": [{"role": "system", "content": system}] + messages
        }
        if tools:
            kwargs["tools"] = tools
        # DeepSeek 的 function calling 可能返回的不是标准格式
        # 需要额外处理
        resp = self.client.chat.completions.create(**kwargs)
        msg = resp.choices[0].message
        return LLMResponse(
            content=msg.content or "",
            tool_calls=msg.tool_calls
        )

需要说明的是,DeepSeek 的 function calling 稳定性不如 GPT-4o,偶尔会返回格式异常的参数。我在生产环境用 GPT-4o 做主 Agent,DeepSeek 做内容生成类的子任务。


步骤4:实现工作流编排

单 Agent 能做的事有限。要处理"抓取 → 改写 → 发布"这种流程,需要多个 Agent 协作:

# workflow.py
from dataclasses import dataclass, field
from typing import List

@dataclass
class WorkflowStep:
    name: str
    agent: Agent
    input_template: str      # 模板字符串,引用上一步的输出
    output_key: str          # 本步骤的输出在上下文中的键名

class Workflow:
    def __init__(self):
        self.steps: List[WorkflowStep] = []
        self.context: dict = {}
    
    def add_step(self, step: WorkflowStep):
        self.steps.append(step)
    
    def run(self, initial_input: dict = None):
        self.context = initial_input or {}
        
        for step in self.steps:
            print(f"
{'='*40}")
            print(f"执行步骤: {step.name}")
            print(f"{'='*40}")
            
            # 渲染输入模板
            task = step.input_template.format(**self.context)
            
            # 创建独立的 Agent 上下文
            agent_ctx = AgentContext(
                system_prompt=step.agent._build_system_prompt()
            )
            
            # 运行 Agent
            result = step.agent.run(task, context=agent_ctx)
            
            # 保存结果
            self.context[step.output_key] = result
            
            print(f"✓ {step.name} 完成")
        
        return self.context

然后用它来定义实际的发布流水线:

# main.py
# 初始化 LLM
llm = OpenAILLM(model="gpt-4o")

# 创建各个 Agent
fetcher = Agent(llm, tools=[weibo_tool, rss_tool])
writer = Agent(llm, tools=[search_tool])
publisher = Agent(llm, tools=[wp_tool, wechat_tool])

# 定义工作流
wf = Workflow()
wf.add_step(WorkflowStep(
    name="选题采集",
    agent=fetcher,
    input_template="从热搜和 RSS 源中找出今天最热门的技术话题,输出 3 个选题",
    output_key="topics"
))
wf.add_step(WorkflowStep(
    name="文章生成",
    agent=writer,
    input_template="根据选题 {topics},撰写一篇技术博客文章,包含代码示例和实操步骤",
    output_key="article"
))
wf.add_step(WorkflowStep(
    name="发布",
    agent=publisher,
    input_template="将以下文章发布到 WordPress 和公众号:

{article}",
    output_key="publish_result"
))

# 执行
result = wf.run()
print(result["publish_result"])

步骤5:加入记忆与状态持久化

这是最让我头疼的一块。Agent 需要记住昨天发了什么、哪些选题效果好,才能持续优化。

我实现了一个轻量的记忆层:

# memory.py
import sqlite3
import json
from datetime import datetime, date

class AgentMemory:
    """基于 SQLite 的持久化记忆"""
    
    def __init__(self, db_path: str = "agent_memory.db"):
        self.conn = sqlite3.connect(db_path)
        self._init_db()
    
    def _init_db(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS memories (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                key TEXT NOT NULL,
                value TEXT NOT NULL,
                created_at TEXT NOT NULL,
                UNIQUE(key)
            )
        """)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS episodes (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                date TEXT NOT NULL,
                summary TEXT NOT NULL,
                metrics TEXT  -- JSON
            )
        """)
        self.conn.commit()
    
    def remember(self, key: str, value: Any):
        """保存记忆"""
        self.conn.execute(
            "INSERT OR REPLACE INTO memories (key, value, created_at) VALUES (?, ?, ?)",
            (key, json.dumps(value, ensure_ascii=False), datetime.now().isoformat())
        )
        self.conn.commit()
    
    def recall(self, key: str) -> Any | None:
        """回忆"""
        row = self.conn.execute(
            "SELECT value FROM memories WHERE key = ?", (key,)
        ).fetchone()
        return json.loads(row[0]) if row else None
    
    def save_episode(self, summary: str, metrics: dict = None):
        """记录一次执行"""
        today = date.today().isoformat()
        self.conn.execute(
            "INSERT INTO episodes (date, summary, metrics) VALUES (?, ?, ?)",
            (today, summary, json.dumps(metrics or {}))
        )
        self.conn.commit()
    
    def get_recent_episodes(self, days: int = 7) -> list:
        """回顾最近的执行记录"""
        return self.conn.execute(
            "SELECT date, summary, metrics FROM episodes WHERE date >= date('now', ?)",
            (f"-{days} days",)
        ).fetchall()

然后在 Agent 初始化时注入记忆:

# 改造 Agent 的 _build_system_prompt
def _build_system_prompt(self) -> str:
    # 获取最近的执行记录
    recent = self.memory.get_recent_episodes(days=3)
    memory_context = ""
    if recent:
        memory_context = "近期执行记录:
" + "
".join(
            f"- [{row[0]}] {row[1]}" for row in recent
        )
    
    return f"""你是一个智能工作流助手。
    
{memory_context}

你可以使用工具来完成任务。每次调用工具后,根据返回结果决定下一步行动。
当任务完成时,用中文给出总结。
"""

踩过一个坑:一开始我把所有历史对话都塞进 system prompt,结果 token 消耗暴涨 3 倍,模型反而"迷失"在大量历史信息里。后来改为只总结关键指标——昨天的文章标题、阅读量、转化率——效果好多了。


结果与总结

这套框架跑了三周,数据说话:

  • **日均执行**: 6-8 个自动化任务(选题 → 生成 → 发布)
  • **成功率**: GPT-4o 约 92%,DeepSeek 约 78%(主要在 function calling 环节出问题)
  • **节省时间**: 原来人工处理大约 2 小时/天,现在降到 15 分钟/天(主要用来检查输出质量)
  • **Token 消耗**: 日均约 30 万 tokens,成本约 ¥3.5/天
  • 分享三个印象最深的坑:

    LLM 的幻觉会传递。第一轮选题采集时如果 LLM 幻觉出一个不存在的"热点",后面的生成和发布都会基于这个幻觉继续推进——Garbage in, garbage out 在 Agent 场景下被放大了。解决方案是每一步都加人工确认点(human-in-the-loop),关键决策让人类确认。

    工具调用异常处理。LLM 偶尔会传荒谬的参数——比如给 fetch_article 传一个 5000 字的 URL——必须在工具层做参数校验和超时保护。永远不要在工具函数里不加 try-catch。

    上下文窗口管理。Agent 跑 5-6 步后,消息列表会膨胀到几万 tokens。我加了自动裁剪逻辑:消息超过阈值时,把早期的 tool call 结果摘要化。


    延伸思考

    这个框架还有不少可以改进的地方。

    并行执行。目前是串行,如果三个选题可以并行生成,速度能提升 2-3 倍。Python 的 asyncio 能解决这个问题。

    向量记忆。SQLite 做记忆存储很轻量,但语义检索能力为零。下一步可以接入轻量的向量数据库(比如 chromadb),让 Agent 能根据语义搜索历史经验。

    Agent 自省。当前某一步失败,整个流程就断了。理想情况下,Agent 应该能自省——发现工具调用失败后自动切换方案,比如 WordPress 发布失败就降级为纯 Markdown 导出。

    成本优化。大部分步骤不需要 GPT-4o 的智商,可以用 DeepSeek 或 Gemini 2.5 Flash 替代,只在关键决策时才调用 GPT-4o。这种分级模型策略预计能把成本再砍一半。


    *以上代码已脱敏整理,核心框架约 500 行 Python。完整的工程化版本(含错误重试、监控告警、Web Dashboard)正在整理中,后续会开源。*

    公众号「花叔」| 30 万+ 开发者关注 | AI 工具与效率提升

    你可能感兴趣的文章

    来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
    转载请注明出处: https://teachcourse.cn/4037.html ,谢谢支持!

    资源分享

    分类:Android 标签:
    Android开发之HorizontalScrollView控件使用案例介绍 Android开发之HorizontalScro
    Android手机和笔记本电脑之间搭建局域网 Android手机和笔记本电脑之间搭
    Ubuntu系统flask服务和wsgi运行示例说明 Ubuntu系统flask服务和wsgi运行
    重置路由器电脑网络显示黄色叹号怎么办? 重置路由器电脑网络显示黄色叹号

    评论已关闭!