AI Agent 开发实战:从零构建智能工作流
花了一个月用 Python 从零搭了一套 AI Agent 框架,跑通后把日常 80% 的重复工作都交给了它。
问题是什么
今年年初我接了个活儿:帮一个内容团队搭建自动化发布流水线。需求听起来简单——每天从 RSS 抓取文章 → AI 改写 → 发布到 WordPress 和公众号。
试了一圈现成的平台(n8n、Dify、Coze),发现都不太对劲。要么定制能力不够——公众号的排版引擎各家都不一样,要么 API 调用成本偏高——按次计费,一天几百次跑下来比人工还贵。
更重要的是,我希望 Agent 能记住昨天的发布情况、根据阅读量调整选题策略。这类状态管理在低代码平台上做起来非常别扭。
最后决定:自己写一个 Agent 框架。
解决思路
市面上主流的 Agent 架构有三种:
| 方案 | 代表 | 优点 | 缺点 |
|---|---|---|---|
| 纯 LLM + Function Calling | OpenAI Assistants API | 开发快,基础设施完善 | vendor lock-in,调试困难 |
| 编排框架 | LangChain / CrewAI | 生态丰富,组件多 | 抽象层太厚,出了问题不好追 |
| 自建循环 | 自己写 while True | 完全可控,零依赖 | 要自己处理所有边缘情况 |
我选了第三种——自建循环。理由很简单:
我的场景足够具体,不需要通用框架的臃肿抽象。框架的"黑盒"特性在调试时让人抓狂——出问题了分不清是 prompt 没写好还是框架有 bug。而且自建循环让我能精确控制 token 消耗,实测比 LangChain 方案省了 40% 的 tokens。
操作步骤
步骤1:定义 Agent 的核心循环
Agent 的本质就是一个循环:思考 → 行动 → 观察 → 再思考。
# agent_core.py
import json
import time
from dataclasses import dataclass, field
from typing import Any
@dataclass
class AgentContext:
"""Agent 的上下文状态"""
system_prompt: str
messages: list = field(default_factory=list)
max_steps: int = 10
step: int = 0
class Agent:
def __init__(self, llm, tools: list):
self.llm = llm # LLM 客户端
self.tools = tools # 工具列表
self.context = None
def run(self, task: str, context: AgentContext = None) -> str:
"""主循环:思考→行动→观察"""
if context is None:
context = AgentContext(
system_prompt=self._build_system_prompt()
)
self.context = context
# 把用户任务加入对话
self.context.messages.append({"role": "user", "content": task})
while self.context.step < self.context.max_steps:
self.context.step += 1
print(f"[Step {self.context.step}/{self.context.max_steps}]")
# 思考:调用 LLM
response = self.llm.chat(
system=self.context.system_prompt,
messages=self.context.messages,
tools=[t.to_schema() for t in self.tools]
)
# 如果 LLM 没有调用工具,直接返回结果
if not response.tool_calls:
final = response.content
self.context.messages.append(
{"role": "assistant", "content": final}
)
return final
# 行动:执行工具调用
self.context.messages.append(response.to_message())
for tool_call in response.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
print(f" → 调用工具: {tool_name}({tool_args})")
# 执行工具
tool_result = self._execute_tool(tool_name, tool_args)
# 观察:把结果附加到对话
self.context.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(tool_result)
})
raise TimeoutError(f"Agent 达到最大步骤 {self.context.max_steps}")
def _execute_tool(self, name: str, args: dict) -> Any:
for tool in self.tools:
if tool.name == name:
return tool.fn(**args)
raise ValueError(f"未知工具: {name}")
def _build_system_prompt(self) -> str:
return """你是一个智能工作流助手。你可以使用工具来完成任务。
每次调用工具后,根据返回结果决定下一步行动。
当任务完成时,用中文给出总结。"""
这里的关键设计是 AgentContext 和 Agent 分离——这样可以让多个 Agent 共享同一个 LLM 实例,但各自维护独立的上下文状态。
步骤2:设计工具系统
工具是 Agent 的"手脚"。我设计了一个轻量的工具注册机制:
# tools/base.py
from dataclasses import dataclass
from typing import Callable, Any
@dataclass
class Tool:
name: str
description: str
parameters: dict # JSON Schema 格式
fn: Callable # 实际执行的函数
def to_schema(self) -> dict:
"""转换成 OpenAI Function Calling 格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
}
# tools/weibo.py — 示例:微博采集工具
import feedparser
def fetch_weibo_hotlist() -> list[dict]:
"""抓取微博热搜"""
url = "https://weibo.com/ajax/side/hotSearch"
# 实际实现用 requests
data = _fetch_json(url)
return [
{"title": item["word"], "hot": item["raw_hot"]}
for item in data["data"]["realtime"]
]
weibo_tool = Tool(
name="fetch_weibo_hotlist",
description="获取微博实时热搜榜单",
parameters={"type": "object", "properties": {}},
fn=fetch_weibo_hotlist
)
我还写了个 @tool 装饰器,省得每次手动定义 Tool 对象:
# tools/registry.py
from functools import wraps
from typing import get_type_hints
import inspect
_registry: dict[str, Tool] = {}
def tool(name: str = None, description: str = None):
def decorator(func):
nonlocal name, description
t_name = name or func.__name__
t_desc = description or func.__doc__ or ""
# 从函数签名自动生成 JSON Schema
sig = inspect.signature(func)
hints = get_type_hints(func)
properties = {}
required = []
for param_name, param in sig.parameters.items():
if param_name == "return":
continue
ptype = hints.get(param_name, str)
properties[param_name] = {"type": _type_to_str(ptype)}
if param.default is inspect.Parameter.empty:
required.append(param_name)
parameters = {
"type": "object",
"properties": properties
}
if required:
parameters["required"] = required
_registry[t_name] = Tool(
name=t_name,
description=t_desc,
parameters=parameters,
fn=func
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return decorator
def _type_to_str(t: type) -> str:
mapping = {str: "string", int: "integer", float: "number", bool: "boolean", list: "array", dict: "object"}
return mapping.get(t, "string")
# 使用示例
@tool(description="计算两个数的和")
def add(a: int, b: int) -> int:
return a + b
这个装饰器的好处是——函数的类型注解就是工具的 schema,不用写两份。
步骤3:对接 LLM
LLM 这块我封装了一个适配器层,方便切换不同的模型提供方:
# llm/adapter.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class LLMResponse:
content: str
tool_calls: list | None = None
def to_message(self) -> dict:
msg = {"role": "assistant", "content": self.content}
if self.tool_calls:
msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in self.tool_calls
]
return msg
class LLM(ABC):
@abstractmethod
def chat(self, system: str, messages: list, tools: list = None) -> LLMResponse:
...
# llm/openai_adapter.py
from openai import OpenAI
class OpenAILLM(LLM):
def __init__(self, model: str = "gpt-4o", api_key: str = None):
self.client = OpenAI(api_key=api_key)
self.model = model
def chat(self, system: str, messages: list, tools: list = None) -> LLMResponse:
kwargs = {
"model": self.model,
"messages": [{"role": "system", "content": system}] + messages
}
if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = "auto"
resp = self.client.chat.completions.create(**kwargs)
msg = resp.choices[0].message
return LLMResponse(
content=msg.content or "",
tool_calls=msg.tool_calls
)
# llm/deepseek_adapter.py
from openai import OpenAI
class DeepSeekLLM(LLM):
"""DeepSeek 兼容 OpenAI SDK"""
def __init__(self, model: str = "deepseek-chat", api_key: str = None):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com"
)
self.model = model
def chat(self, system: str, messages: list, tools: list = None) -> LLMResponse:
kwargs = {
"model": self.model,
"messages": [{"role": "system", "content": system}] + messages
}
if tools:
kwargs["tools"] = tools
# DeepSeek 的 function calling 可能返回的不是标准格式
# 需要额外处理
resp = self.client.chat.completions.create(**kwargs)
msg = resp.choices[0].message
return LLMResponse(
content=msg.content or "",
tool_calls=msg.tool_calls
)
需要说明的是,DeepSeek 的 function calling 稳定性不如 GPT-4o,偶尔会返回格式异常的参数。我在生产环境用 GPT-4o 做主 Agent,DeepSeek 做内容生成类的子任务。
步骤4:实现工作流编排
单 Agent 能做的事有限。要处理"抓取 → 改写 → 发布"这种流程,需要多个 Agent 协作:
# workflow.py
from dataclasses import dataclass, field
from typing import List
@dataclass
class WorkflowStep:
name: str
agent: Agent
input_template: str # 模板字符串,引用上一步的输出
output_key: str # 本步骤的输出在上下文中的键名
class Workflow:
def __init__(self):
self.steps: List[WorkflowStep] = []
self.context: dict = {}
def add_step(self, step: WorkflowStep):
self.steps.append(step)
def run(self, initial_input: dict = None):
self.context = initial_input or {}
for step in self.steps:
print(f"
{'='*40}")
print(f"执行步骤: {step.name}")
print(f"{'='*40}")
# 渲染输入模板
task = step.input_template.format(**self.context)
# 创建独立的 Agent 上下文
agent_ctx = AgentContext(
system_prompt=step.agent._build_system_prompt()
)
# 运行 Agent
result = step.agent.run(task, context=agent_ctx)
# 保存结果
self.context[step.output_key] = result
print(f"✓ {step.name} 完成")
return self.context
然后用它来定义实际的发布流水线:
# main.py
# 初始化 LLM
llm = OpenAILLM(model="gpt-4o")
# 创建各个 Agent
fetcher = Agent(llm, tools=[weibo_tool, rss_tool])
writer = Agent(llm, tools=[search_tool])
publisher = Agent(llm, tools=[wp_tool, wechat_tool])
# 定义工作流
wf = Workflow()
wf.add_step(WorkflowStep(
name="选题采集",
agent=fetcher,
input_template="从热搜和 RSS 源中找出今天最热门的技术话题,输出 3 个选题",
output_key="topics"
))
wf.add_step(WorkflowStep(
name="文章生成",
agent=writer,
input_template="根据选题 {topics},撰写一篇技术博客文章,包含代码示例和实操步骤",
output_key="article"
))
wf.add_step(WorkflowStep(
name="发布",
agent=publisher,
input_template="将以下文章发布到 WordPress 和公众号:
{article}",
output_key="publish_result"
))
# 执行
result = wf.run()
print(result["publish_result"])
步骤5:加入记忆与状态持久化
这是最让我头疼的一块。Agent 需要记住昨天发了什么、哪些选题效果好,才能持续优化。
我实现了一个轻量的记忆层:
# memory.py
import sqlite3
import json
from datetime import datetime, date
class AgentMemory:
"""基于 SQLite 的持久化记忆"""
def __init__(self, db_path: str = "agent_memory.db"):
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
value TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(key)
)
""")
self.conn.execute("""
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
summary TEXT NOT NULL,
metrics TEXT -- JSON
)
""")
self.conn.commit()
def remember(self, key: str, value: Any):
"""保存记忆"""
self.conn.execute(
"INSERT OR REPLACE INTO memories (key, value, created_at) VALUES (?, ?, ?)",
(key, json.dumps(value, ensure_ascii=False), datetime.now().isoformat())
)
self.conn.commit()
def recall(self, key: str) -> Any | None:
"""回忆"""
row = self.conn.execute(
"SELECT value FROM memories WHERE key = ?", (key,)
).fetchone()
return json.loads(row[0]) if row else None
def save_episode(self, summary: str, metrics: dict = None):
"""记录一次执行"""
today = date.today().isoformat()
self.conn.execute(
"INSERT INTO episodes (date, summary, metrics) VALUES (?, ?, ?)",
(today, summary, json.dumps(metrics or {}))
)
self.conn.commit()
def get_recent_episodes(self, days: int = 7) -> list:
"""回顾最近的执行记录"""
return self.conn.execute(
"SELECT date, summary, metrics FROM episodes WHERE date >= date('now', ?)",
(f"-{days} days",)
).fetchall()
然后在 Agent 初始化时注入记忆:
# 改造 Agent 的 _build_system_prompt
def _build_system_prompt(self) -> str:
# 获取最近的执行记录
recent = self.memory.get_recent_episodes(days=3)
memory_context = ""
if recent:
memory_context = "近期执行记录:
" + "
".join(
f"- [{row[0]}] {row[1]}" for row in recent
)
return f"""你是一个智能工作流助手。
{memory_context}
你可以使用工具来完成任务。每次调用工具后,根据返回结果决定下一步行动。
当任务完成时,用中文给出总结。
"""
踩过一个坑:一开始我把所有历史对话都塞进 system prompt,结果 token 消耗暴涨 3 倍,模型反而"迷失"在大量历史信息里。后来改为只总结关键指标——昨天的文章标题、阅读量、转化率——效果好多了。
结果与总结
这套框架跑了三周,数据说话:
分享三个印象最深的坑:
LLM 的幻觉会传递。第一轮选题采集时如果 LLM 幻觉出一个不存在的"热点",后面的生成和发布都会基于这个幻觉继续推进——Garbage in, garbage out 在 Agent 场景下被放大了。解决方案是每一步都加人工确认点(human-in-the-loop),关键决策让人类确认。
工具调用异常处理。LLM 偶尔会传荒谬的参数——比如给 fetch_article 传一个 5000 字的 URL——必须在工具层做参数校验和超时保护。永远不要在工具函数里不加 try-catch。
上下文窗口管理。Agent 跑 5-6 步后,消息列表会膨胀到几万 tokens。我加了自动裁剪逻辑:消息超过阈值时,把早期的 tool call 结果摘要化。
延伸思考
这个框架还有不少可以改进的地方。
并行执行。目前是串行,如果三个选题可以并行生成,速度能提升 2-3 倍。Python 的 asyncio 能解决这个问题。
向量记忆。SQLite 做记忆存储很轻量,但语义检索能力为零。下一步可以接入轻量的向量数据库(比如 chromadb),让 Agent 能根据语义搜索历史经验。
Agent 自省。当前某一步失败,整个流程就断了。理想情况下,Agent 应该能自省——发现工具调用失败后自动切换方案,比如 WordPress 发布失败就降级为纯 Markdown 导出。
成本优化。大部分步骤不需要 GPT-4o 的智商,可以用 DeepSeek 或 Gemini 2.5 Flash 替代,只在关键决策时才调用 GPT-4o。这种分级模型策略预计能把成本再砍一半。
*以上代码已脱敏整理,核心框架约 500 行 Python。完整的工程化版本(含错误重试、监控告警、Web Dashboard)正在整理中,后续会开源。*
公众号「花叔」| 30 万+ 开发者关注 | AI 工具与效率提升

评论已关闭!