Python 异步编程:asyncio 协程实战与常见坑
用 asyncio 重写了一个日处理百万级请求的数据管道后,我得出的结论是:asyncio 能极大简化 I/O 密集型代码,但隐式阻塞、回调地狱、事件循环误用这三大坑,足以让性能从「异步」退化成「比同步还慢」。
背景
去年我接手一个数据采集服务,同时要拉取 30+ 第三方 API、写入消息队列、做日志聚合。
同步代码用线程池硬扛,CPU 上下文切换冲到 80%,频繁 OOM。决定用 asyncio 重写。
问题是什么
同步代码长这样:
for api in apis:
data = requests.get(api) # 等 2 秒
queue.put(process(data)) # 等 0.5 秒
30 个 API 串行跑,一轮就要一分多钟。换成 concurrent.futures.ThreadPoolExecutor 并行后,又有新问题:
我需要的是:单线程内实现并发,代码保持顺序写法的可读性。
解决思路
横向对比了几个方案:
| 方案 | 优点 | 缺点 |
|---|---|---|
threading + ThreadPoolExecutor |
改造成本低 | GIL 限制 CPU 密集型,大并发下线程切换开销 |
concurrent.futures.ProcessPoolExecutor |
绕开 GIL | 进程间通信成本高,不适合大量短连接 |
asyncio + aiohttp |
单线程事件循环,轻量级任务切换,千万级并发 | 生态不完整,部分库不支持 async,隐式阻塞难排查 |
我选了 asyncio。原因很简单:这个场景 99% 的时间花在 I/O 等待上,asyncio 正好发挥「在等待时切去干别的」的优势。
操作步骤
步骤 1:把同步请求换成 async 版本
第一件事是把 requests 换成 aiohttp,queue.put 换成 async 队列。
import asyncio
import aiohttp
from asyncio import Queue
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.json()
async def worker(name, queue, session):
while True:
url = await queue.get()
try:
data = await fetch(session, url)
print(f"[{name}] {url} -> {len(data)}条")
except Exception as e:
print(f"[{name}] {url} 失败: {e}")
finally:
queue.task_done()
async def main():
urls = [f"https://api.example.com/data/{i}" for i in range(100)]
queue = Queue()
for url in urls:
await queue.put(url)
async with aiohttp.ClientSession() as session:
workers = [asyncio.create_task(worker(f"W{i}", queue, session))
for i in range(10)]
await queue.join()
for w in workers:
w.cancel()
asyncio.run(main())
注意:
asyncio.run()在 Python 3.7+ 才有。3.6 及以前用loop.run_until_complete(main()),还要手动关 loop。
步骤 2:处理超时——别让一个慢接口拖死全部
同步代码里 requests.get 设个 timeout 就行。asyncio 里要用 asyncio.wait_for:
async def fetch_with_timeout(session, url, timeout=10):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
return await resp.json()
except asyncio.TimeoutError:
print(f"超时: {url}")
return None
坑来了:aiohttp.ClientTimeout 和 asyncio.wait_for 是两层超时机制。只设 wait_for 但没设 ClientTimeout,底层 socket 可能一直不释放。我建议两层都设,wait_for 略大于 ClientTimeout(留出响应序列化的余量)。
步骤 3:信号量控制并发——别把目标 API 打崩
100 个 URL 同时发请求,对方的 API 网关直接 429。
sem = asyncio.Semaphore(10)
async def fetch_limited(session, url):
async with sem:
return await fetch_with_timeout(session, url)
把 fetch_limited 塞进 worker 就行。Semaphore 控制同时进行的请求数,不是总请求数——这正是我们想要的。
步骤 4:错误重试——指数退避
网络请求必然有失败,不加重试的异步代码是玩具:
async def fetch_with_retry(session, url, max_retries=3):
for attempt in range(max_retries):
try:
return await fetch_with_timeout(session, url)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
raise
wait = 2 ** attempt # 指数退避: 1s, 2s, 4s
print(f"重试 {url} ({attempt+1}/{max_retries}),等待 {wait}s")
await asyncio.sleep(wait)
return None
注意: 这里用的是
asyncio.sleep()而不是time.sleep()。time.sleep()是阻塞的,在协程里调用它会阻塞整个事件循环,相当于把异步变成了同步。后面会专门讲这个坑。
步骤 5:正确使用 `asyncio.run()`
很多人(包括我一开始)会这样写:
# ❌ 错误写法
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Python 3.10+ 里,get_event_loop() 如果不在正在运行的事件循环中调用会抛警告。正确做法是:
# ✅ 正确写法
asyncio.run(main())
asyncio.run() 会自动创建新的事件循环、运行协程、关闭 loop 并清理所有未完成的任务。不要手动管理 loop,除非你清楚自己在做什么。
结果与总结
重写后的效果:
踩的坑也不少,总结几个最容易中招的:
坑 1:混用同步阻塞调用
# ❌ 这会阻塞整个事件循环
await asyncio.sleep(1)
result = some_sync_function() # 如果这个函数执行 5 秒,所有协程都卡住
排查方法:启动 asyncio 的调试模式——asyncio.run(main(), debug=True),事件循环会打印执行超过 100ms 的回调。
坑 2:忘记 `await`
result = fetch(session, url) # ❌ 没 await,拿到的是一个 coroutine 对象,不是结果
这是最频繁的错误。类型提示能帮上忙:fetch 返回值是 Coroutine 不是 dict,IDE 会标黄。
坑 3:任务未捕获的异常会静默消失
task = asyncio.create_task(some_coro()) # 如果 some_coro 抛异常,没人知道
一定要注册 add_done_callback 或显式 await task:
task = asyncio.create_task(some_coro())
task.add_done_callback(lambda t: t.exception() or None)
# 或者
try:
await task
except:
...
坑 4:`asyncio.gather()` 默认不等待所有任务
# gather 默认是「短路」模式:一个异常就取消其他任务
results = await asyncio.gather(task1, task2, task3)
如果你希望即使部分失败也要等全部完成,用 return_exceptions=True:
results = await asyncio.gather(task1, task2, task3, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
handle_error(r)
延伸思考
这套方案对 I/O 密集型很有效。但任务里混了 CPU 密集型操作(如 JSON 解析大量数据、加密解密),光靠 asyncio 不够——它不会让 Python 代码跑得更快。解法:
另外,如果生产级 asyncio 代码比较多,可以看看 trio 或 anyio——它们用结构化并发(nursery)替代 asyncio.create_task,从根本上避免了「忘了等待任务」的问题。Python 3.11+ 引入的 TaskGroup(基于 PEP 654 ExceptionGroup)也在往这个方向靠。
最后推荐几个我常用的配套库:

评论已关闭!