3年踩坑总结:Python异步编程入门,先学会用 `asyncio.run()` 再说别的
背景:团队要把一个日处理 10 万条数据的同步脚本改成异步,我被安排做技术调研,之前对 async/await 只有概念层面的了解。
问题是什么
场景很典型:一个 for 循环里连续调了 5 个 HTTP API,每个请求等 2-3 秒才返回,串行下来一次处理要 15 秒。10 万条数据算下来差不多 17 天——这显然不能忍。
直觉告诉我要用"异步",但我们小组之前一直写的都是同步代码,Python 异步的落地经验为零。查了一圈文档和博客,网上教程要么是 "Hello World" 级别的 demo,要么直接甩出 asyncio.gather 和 Semaphore 就结束了,根本不解释"为什么这么写"以及"坑在哪"。
我花了大概两周,把整条数据处理链路全改成了异步。这篇文章是那段时间的实战笔记。
解决思路
选型没什么悬念:
果断选 asyncio。但选了不代表会了——第一版代码写完,跑起来全是 Warning 和意外阻塞。
操作步骤
步骤1:把同步函数改成异步函数
最基础的一步。原始代码长这样:
import requests
def fetch_data(api_url):
resp = requests.get(api_url, timeout=10)
return resp.json()
改成:
import aiohttp
import asyncio
async def fetch_data(session, api_url):
async with session.get(api_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return await resp.json()
核心就两件事:def → async def,同步 requests → 异步 aiohttp。
注意:
requests不支持异步。网上有人用asyncio.to_thread包装requests.get,这本质是开线程,不是真异步,并发上来性能会很难看。老老实实换aiohttp或httpx(async 模式)。
步骤2:搞定协程的"入口点"
Python 3.7+ 的推荐写法:
async def main():
async with aiohttp.ClientSession() as session:
data = await fetch_data(session, "https://api.example.com/data")
print(data)
asyncio.run(main())
asyncio.run() 是 3.7 加入的,帮你做了三件事:
别再用 loop = asyncio.get_event_loop(); loop.run_until_complete(main()),那是 3.7 之前的写法。我一开始照抄了老博客的代码,结果在 3.10 上跑出了 DeprecationWarning。
步骤3:并发执行多个协程(关键)
这是异步最核心的用途。单协程跟同步没区别——await 一样会等。
错误的写法:
async def main():
async with aiohttp.ClientSession() as session:
for url in urls:
data = await fetch_data(session, url) # 还是串行!
正确的写法:
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.gather 把所有协程包装成 Task 提交到事件循环并行执行。五个 3 秒的请求,串行 15 秒,并行只要 3 秒出头。
注意:
asyncio.gather默认情况下第一个异常会直接往外抛,其他协程不会被取消,会在后台继续跑完。这其实也是个坑——失败的请求你没处理,但剩下的还在跑。传return_exceptions=True可以把异常塞进 results 列表里,回来再统一处理。
步骤4:控制并发数(血的教训)
第一个版本直接用 asyncio.gather 丢了 1000 个 URL 进去,结果 aiohttp.ClientSession 一口气创建了 1000 个连接,直接把目标 API 服务器打挂了——对方返回 503,运维那边报警了。
需要用信号量(Semaphore)控制并发:
sem = asyncio.Semaphore(10) # 最多10个并发
async def fetch_with_limit(session, url):
async with sem:
return await fetch_data(session, url)
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url) for url in urls]
results = await asyncio.gather(*tasks)
Semaphore 内部的计数器在 async with sem 进入时减一,退出时加一。达到上限时后面的协程会自动排队。
我最后翻了下对方的 API 文档,把并发上限设成 20,之后一直稳定运行。
步骤5:超时和重试(产品级必备)
网络请求不设超时等于给自己埋雷:
import asyncio
from asyncio import TimeoutError
async def fetch_with_retry(session, url, retries=3, timeout=10):
for attempt in range(retries):
try:
async with asyncio.timeout(timeout):
async with session.get(url) as resp:
return await resp.json()
except (TimeoutError, aiohttp.ClientError) as e:
if attempt == retries - 1:
raise # 最后一次还失败,往上抛
wait = 2 ** attempt # 指数退避:1s, 2s, 4s
await asyncio.sleep(wait)
asyncio.timeout() 是 Python 3.11 引入的,之前得用 asyncio.wait_for(fetch_data(...), timeout=10)。用 3.11+ 的话,直接这么写就行。
结果与总结
改造后,从串行 15 秒/条降到了并行 3.5 秒/条(20 并发),10 万条数据从 17 天降到了不到 5 小时。这还是加了重试和对方 API 限速的情况下。
几个坑记一下:
延伸思考

评论已关闭!