三个月踩了 8 个坑,我总结出一套 MCP 工具开发的最佳实践
背景:给团队搭建 Claude Code + MCP 工具链时,我需要把内部 API 和数据库查询封装成 MCP 工具,结果被权限、超时、认证、并发轮番折磨。
问题是什么
MCP(Model Context Protocol)是 Anthropic 推出的 AI 工具调用协议,让 Claude 能调用外部工具——查数据库、调 API、操作文件系统。听起来很美好,真正动手写服务端时才发现:文档不够细、示例偏玩具、踩坑全靠自己试。
我要对接三个真实场景:
每个场景都不复杂,但组合起来涉及认证、连接池、超时控制和错误处理,稍不注意 Claude 那边就报 tool_error。
解决思路
MCP 官方提供了两种协议实现:
| 方案 | 协议层 | 适用场景 |
|---|---|---|
@modelcontextprotocol/sdk(TypeScript) |
stdio / SSE | Node.js 生态,适合前端团队 |
mcp(Python SDK) |
stdio / SSE | Python 生态,适合数据/后端团队 |
我选了 Python SDK。我们的内部工具链以 Python 为主,而且 mcp 库支持异步,天然适合 IO 密集型任务。
传输方式也有两种选择:
我的策略:本地开发用 stdio,部署用 SSE。
操作步骤
步骤1:初始化 MCP 服务端项目
项目结构如下:
mcp-tools/
├── server.py # MCP 服务端入口
├── tools/
│ ├── __init__.py
│ ├── employee.py # 员工查询工具
│ ├── database.py # 数据库查询工具
│ └── gitlab.py # GitLab 工具
├── config.py # 配置管理
└── requirements.txt
安装依赖:
pip install mcp httpx asyncpg
注意:
mcp包需要 Python 3.10+,大量用了 async/await 和类型注解。
步骤2:编写第一个 MCP 工具
MCP 工具的核心概念很简单——用 @mcp.tool() 装饰器声明一个函数,它就是可被 Claude 调用的工具。参数类型注解会自动生成 JSON Schema。
先写一个最简单的员工查询工具:
# tools/employee.py
from mcp.server import Server
from mcp.server.models import Tool
import httpx
async def get_employee(employee_id: str) -> dict:
"""调用内部 API 获取员工信息"""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"https://api.internal.company.com/employees/{employee_id}",
headers={"Authorization": f"Bearer {_get_token()}"}
)
resp.raise_for_status()
return resp.json()
def register_employee_tools(mcp: Server):
@mcp.tool()
async def query_employee(employee_id: str) -> str:
"""根据员工 ID 查询员工基本信息,包括姓名、部门、职级和入职时间"""
try:
data = await get_employee(employee_id)
return (
f"员工 {data['name']}({employee_id})
"
f"部门:{data['department']}
"
f"职级:{data['level']}
"
f"入职时间:{data['join_date']}"
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"未找到员工 {employee_id}"
return f"查询失败:{e}"
几个关键点:
步骤3:实现数据库查询工具(踩坑最多)
数据库查询是真正的挑战。要让 Claude 安全地查数据库,但不能让它执行危险操作,也不能让连接泄漏。
# tools/database.py
import asyncpg
from mcp.server import Server
class DatabasePool:
"""数据库连接池,支持安全查询"""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
async def init(self):
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=2,
max_size=5,
command_timeout=30 # 关键:查询超时
)
async def close(self):
if self.pool:
await self.pool.close()
async def safe_query(self, sql: str, params: tuple = None) -> list[dict]:
"""带保护的数据库查询"""
# 安全检查:只允许 SELECT
if not sql.strip().upper().startswith("SELECT"):
raise ValueError("只允许执行 SELECT 查询")
# 限制返回行数
limited_sql = f"SELECT * FROM ({sql}) AS _sub LIMIT 100"
async with self.pool.acquire() as conn:
if params:
rows = await conn.fetch(limited_sql, *params)
else:
rows = await conn.fetch(limited_sql)
return [dict(row) for row in rows]
注册工具的代码:
def register_database_tools(mcp: Server, db_pool: DatabasePool):
@mcp.tool()
async def query_database(sql: str) -> str:
"""执行数据库查询。输入必须是 SQL SELECT 语句。示例:SELECT id, name FROM users WHERE status = 'active'
- 只支持 SELECT 查询
- 自动限制返回 100 行
- 查询超时 30 秒
"""
try:
results = await db_pool.safe_query(sql)
if not results:
return "查询结果为空"
# 格式化输出
lines = []
headers = list(results[0].keys())
lines.append(" | ".join(headers))
lines.append("-" * len(" | ".join(headers)))
for row in results:
values = [str(row[h]) for h in headers]
lines.append(" | ".join(values))
return f"共 {len(results)} 条结果:
" + "
".join(lines)
except ValueError as e:
return f"查询被拒绝:{e}"
except asyncpg.exceptions.QueryCanceledError:
return "查询超时(超过 30 秒),请简化 SQL"
except Exception as e:
return f"查询失败:{e}"
踩坑记录: 第一次没加
command_timeout,Claude 生成了一条全表扫描的 SQL,数据库直接打满,生产告警响了。第二坑是没限制 LIMIT,有个表 50 万行,Claude 的SELECT *一把全拉回来,MCP 协议传输直接撑爆。第三坑是连接池泄漏——每个请求都新建连接,跑了几小时数据库连接数冲到 200+。连接池 + 超时 + LIMIT 是数据库工具的三道防线,缺一不可。
步骤4:组装服务端并处理生命周期
服务端主入口负责组装所有工具和管理资源生命周期:
# server.py
import asyncio
from mcp.server import Server
from mcp.server.transport import StdioServerTransport
from tools.employee import register_employee_tools
from tools.database import register_database_tools, DatabasePool
from config import DATABASE_DSN
async def main():
# 初始化 MCP server
mcp = Server("internal-tools")
# 初始化数据库连接池
db_pool = DatabasePool(DATABASE_DSN)
await db_pool.init()
# 注册所有工具
register_employee_tools(mcp)
register_database_tools(mcp, db_pool)
# 生命周期钩子:关闭时释放资源
@mcp.on_shutdown
async def cleanup():
await db_pool.close()
# 启动 stdio 传输
transport = StdioServerTransport()
await mcp.connect(transport)
await mcp.serve()
if __name__ == "__main__":
asyncio.run(main())
步骤5:在 Claude Code 中配置并验证
在 Claude Code 的项目配置中声明 MCP 工具:
{
"mcpServers": {
"internal-tools": {
"command": "python",
"args": ["D:\projects\mcp-tools\server.py"],
"env": {
"MCP_AUTH_TOKEN": "sk-xxxx",
"DATABASE_DSN": "postgresql://user:pass@localhost:5432/db"
}
}
}
}
注意: 环境变量里不要硬编码敏感信息,生产环境通过密钥管理服务注入。
env字段支持读取系统环境变量。
验证工具是否生效:
# 启动 Claude Code 后直接问
"查询员工 10086 的信息"
"查一下 users 表里 status 为 active 的前 5 条记录"
Claude 会自动判断调用哪个工具,调用过程和结果都会在对话中展示。
结果与总结
整个实现跑通后,团队最直观的感受是:内部数据获取效率提升了一个数量级。以前查员工信息要打开 HR 系统搜索,查数据库要连 Navicat 跑 SQL,现在在 Claude Code 里一句话搞定。
几个关键教训:
延伸思考
这套方案还有两个方向可以深入:
另外,MCP 的 "resources" 能力(类似文件系统的只读资源暴露)我还没用上。对于日志文件、配置文件这类静态内容,用 resources 比用工具更自然,下一篇可以专门聊聊。

评论已关闭!