Python 异步编程实战:asyncio 核心机制与性能优化
背景:重构爬虫系统时,我从多线程切换到 asyncio,单机 QPS 从 200 提升到 2000+,中途踩了事件循环、协程调度、连接池等多个坑。
为什么你的 asyncio 代码可能比多线程还慢
见过太多人写了 async def 就以为是异步了,结果跑起来比同步还慢。
asyncio 本质是协作式并发——协程必须主动让出控制权,事件循环才能调度下一个任务。只要有一个 await 在等待时阻塞了,整个事件循环就会卡住。
异步代码里不能有同步阻塞调用。
# 异步代码里用了同步阻塞的 requests 库
async def fetch_all(urls):
results = []
for url in urls:
results.append(requests.get(url)) # 同步阻塞!整个事件循环卡在这里
return results
# 真正的异步写法
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
return await asyncio.gather(*tasks)
事件循环的运行机制
asyncio 的事件循环是一个单线程调度器,维护就绪队列和等待队列两个队列。当协程执行到 await 时被移到等待队列,future 完成后重新放回就绪队列。
import asyncio
async def main():
print("1. 主协程开始")
await asyncio.sleep(0) # 主动让出控制权
print("2. 主协程恢复")
asyncio.run(main())
执行流程:
main()进入就绪队列,开始执行- 打印 "1. 主协程开始"
await asyncio.sleep(0)将协程移到等待队列,调度器立即调度下一个协程- 没有其他协程,调度器重新调用这个协程
- 打印 "2. 主协程恢复"
asyncio.sleep(0) 和 asyncio.sleep(0.001) 不一样。前者只是让出控制权,后者会创建一个真正的定时器任务。
协程调度:gather vs create_task vs wait
这三个是最常用的并发调度方式,但适用场景完全不同。
`asyncio.gather()` — 等待所有任务完成
# 适合:多个独立任务,必须全部完成才继续
async def demo_gather():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
return results
`asyncio.create_task()` — 立即开始调度,不等待
# 适合:需要立即开始执行,但稍后要取结果
async def demo_task():
task = asyncio.create_task(fetch_user(1))
# 这里可以做其他事
await asyncio.sleep(0.5)
result = await task
return result
`asyncio.wait()` — 更灵活的超时控制
# 适合:需要部分完成就继续,或者有超时要求
async def demo_wait():
tasks = [fetch_user(i) for i in range(10)]
done, pending = await asyncio.wait(tasks, timeout=2.0)
# 2秒后,已完成的结果在 done 里,未完成的在 pending 里
for task in pending:
task.cancel()
return [t.result() for t in done]
大多数场景用 gather 就够了。只有需要"启动任务后做点别的,再回来收结果"时才用 create_task。wait 的超时机制在爬虫限速、API 熔断场景非常有用。
性能优化:从 200 QPS 到 2000 QPS
优化一:连接池大小
默认的连接池参数在高频场景下会成为瓶颈。
import aiohttp
async def with_optimized_pool():
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
keepalive_timeout=30
)
async with aiohttp.ClientSession(connector=connector) as session:
pass
我遇到的问题是 limit_per_host 太小,并发一高就报 "Connection pool exhausted",监控显示很多时间耗在等待连接建立上。
优化二:批量任务的调度策略
一次性创建 10000 个任务会让内存暴涨,还可能打挂对方服务器。
async def batch_process(urls, batch_size=100):
results = []
for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]
tasks = [fetch(session, url) for url in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
results.extend(batch_results)
await asyncio.sleep(0)
return results
优化三:信号量控制并发
async def with_semaphore(urls, max_concurrent=50):
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_fetch(url):
async with semaphore:
return await fetch(url)
tasks = [limited_fetch(url) for url in urls]
return await asyncio.gather(*tasks)
这个技巧在对接第三方 API 时特别有用——对方通常有 rate limit,超过就直接封 IP。
踩坑实录:那些让我 debug 到凌晨的问题
坑一:忘记 await
# 错误:返回协程对象,不是结果
async def wrong():
result = some_async_func() # 忘记 await
return result
# 正确
async def right():
result = await some_async_func()
return result
代码能跑,但返回了一个 coroutine 对象。后续试图用这个对象时会报错:TypeError: object Coroutine can't be used in 'await' expression。
坑二:在非异步函数里调用异步函数
# 错误
def sync_func():
result = await asyncio.run(async_func())
# 正确
async def wrapper():
result = await async_func()
# 或者在同步代码里
def sync_caller():
result = asyncio.run(async_func())
坑三:事件循环被多次运行
async def main():
await asyncio.gather(asyncio.sleep(1), asyncio.sleep(2))
asyncio.run(main()) # OK
asyncio.run(main()) # RuntimeError: event loop is already running
在 Jupyter Notebook 或某些调试环境里,事件循环可能已经在运行了。这种情况要用 await 而不是 asyncio.run()。
性能对比数据
爬取 1000 个 HTTP 接口的实测结果:
| 实现方式 | 总耗时 | QPS | 内存峰值 |
|---|---|---|---|
| 同步 requests | 120s | 8 | 80MB |
| 多线程 (50线程) | 8s | 125 | 350MB |
| asyncio (无优化) | 6s | 166 | 120MB |
| asyncio (连接池优化) | 2.5s | 400 | 130MB |
| asyncio (批量+信号量) | 0.5s | 2000 | 140MB |
连接池配置对 asyncio 性能影响最大,占 60% 以上的优化效果。
结论
asyncio 不是银弹,本质是 IO 密集型任务的并发提升。对于 CPU 密集型任务,多进程才是正解。记住三个要点:
- 永远不要在异步代码里调用同步阻塞函数
- 连接池参数是性能瓶颈的主要来源
- 用信号量和批量处理防止打挂对方服务器或自己
如果异步代码跑得比同步还慢,先检查有没有漏写的 await,再看有没有用错库(requests 换成 aiohttp,time.sleep 换成 asyncio.sleep)。
延伸思考
asyncio 能否替代多线程? 对于 IO 密集型任务,asyncio 通常更高效;对于需要真正并行执行的 CPU 密集型任务,或者需要利用多核的场景,多进程 + asyncio 是更好的组合。
其他异步方案:Python 还有 aiofiles(异步文件 IO)、asyncpg(异步 PostgreSQL)、motor(异步 MongoDB)等生态。选择库之前,先确认性能瓶颈在哪儿。
调试工具:asyncio.run() 配合 pytest-asyncio 是基础标配。想看协程调度细节,可以用 asyncio.create_task() 并在任务里加日志,观察任务的生命周期。

评论已关闭!