Python 异步编程:asyncio 协程实战与常见坑

2026-05-03 21:50 Python 异步编程:asyncio 协程实战与常见坑已关闭评论

Python 异步编程:asyncio 协程实战与常见坑

用 asyncio 重写了一个日处理百万级请求的数据管道后,我得出的结论是:asyncio 能极大简化 I/O 密集型代码,但隐式阻塞、回调地狱、事件循环误用这三大坑,足以让性能从「异步」退化成「比同步还慢」

背景

去年我接手一个数据采集服务,同时要拉取 30+ 第三方 API、写入消息队列、做日志聚合。

同步代码用线程池硬扛,CPU 上下文切换冲到 80%,频繁 OOM。决定用 asyncio 重写。


问题是什么

同步代码长这样:

for api in apis:
    data = requests.get(api)  # 等 2 秒
    queue.put(process(data))  # 等 0.5 秒

30 个 API 串行跑,一轮就要一分多钟。换成 concurrent.futures.ThreadPoolExecutor 并行后,又有新问题:

  • 线程数一多,GIL + 上下文切换让 CPU 飙升
  • 每个线程有自己的栈,内存开销大
  • 回调嵌套深了以后,代码变成「金字塔地狱」
  • 我需要的是:单线程内实现并发,代码保持顺序写法的可读性

    解决思路

    横向对比了几个方案:

    方案 优点 缺点
    threading + ThreadPoolExecutor 改造成本低 GIL 限制 CPU 密集型,大并发下线程切换开销
    concurrent.futures.ProcessPoolExecutor 绕开 GIL 进程间通信成本高,不适合大量短连接
    asyncio + aiohttp 单线程事件循环,轻量级任务切换,千万级并发 生态不完整,部分库不支持 async,隐式阻塞难排查

    我选了 asyncio。原因很简单:这个场景 99% 的时间花在 I/O 等待上,asyncio 正好发挥「在等待时切去干别的」的优势。

    操作步骤

    步骤 1:把同步请求换成 async 版本

    第一件事是把 requests 换成 aiohttpqueue.put 换成 async 队列。

    import asyncio
    import aiohttp
    from asyncio import Queue
    
    async def fetch(session, url):
        async with session.get(url) as resp:
            return await resp.json()
    
    async def worker(name, queue, session):
        while True:
            url = await queue.get()
            try:
                data = await fetch(session, url)
                print(f"[{name}] {url} -> {len(data)}条")
            except Exception as e:
                print(f"[{name}] {url} 失败: {e}")
            finally:
                queue.task_done()
    
    async def main():
        urls = [f"https://api.example.com/data/{i}" for i in range(100)]
        queue = Queue()
        for url in urls:
            await queue.put(url)
    
        async with aiohttp.ClientSession() as session:
            workers = [asyncio.create_task(worker(f"W{i}", queue, session))
                       for i in range(10)]
            await queue.join()
            for w in workers:
                w.cancel()
    
    asyncio.run(main())
    

    注意: asyncio.run() 在 Python 3.7+ 才有。3.6 及以前用 loop.run_until_complete(main()),还要手动关 loop。

    步骤 2:处理超时——别让一个慢接口拖死全部

    同步代码里 requests.get 设个 timeout 就行。asyncio 里要用 asyncio.wait_for

    async def fetch_with_timeout(session, url, timeout=10):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
                return await resp.json()
        except asyncio.TimeoutError:
            print(f"超时: {url}")
            return None
    

    坑来了aiohttp.ClientTimeoutasyncio.wait_for 是两层超时机制。只设 wait_for 但没设 ClientTimeout,底层 socket 可能一直不释放。我建议两层都设,wait_for 略大于 ClientTimeout(留出响应序列化的余量)。

    步骤 3:信号量控制并发——别把目标 API 打崩

    100 个 URL 同时发请求,对方的 API 网关直接 429。

    sem = asyncio.Semaphore(10)
    
    async def fetch_limited(session, url):
        async with sem:
            return await fetch_with_timeout(session, url)
    

    fetch_limited 塞进 worker 就行。Semaphore 控制同时进行的请求数,不是总请求数——这正是我们想要的。

    步骤 4:错误重试——指数退避

    网络请求必然有失败,不加重试的异步代码是玩具:

    async def fetch_with_retry(session, url, max_retries=3):
        for attempt in range(max_retries):
            try:
                return await fetch_with_timeout(session, url)
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if attempt == max_retries - 1:
                    raise
                wait = 2 ** attempt  # 指数退避: 1s, 2s, 4s
                print(f"重试 {url} ({attempt+1}/{max_retries}),等待 {wait}s")
                await asyncio.sleep(wait)
        return None
    

    注意: 这里用的是 asyncio.sleep() 而不是 time.sleep()time.sleep()阻塞的,在协程里调用它会阻塞整个事件循环,相当于把异步变成了同步。后面会专门讲这个坑。

    步骤 5:正确使用 `asyncio.run()`

    很多人(包括我一开始)会这样写:

    # ❌ 错误写法
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    

    Python 3.10+ 里,get_event_loop() 如果不在正在运行的事件循环中调用会抛警告。正确做法是:

    # ✅ 正确写法
    asyncio.run(main())
    

    asyncio.run() 会自动创建新的事件循环、运行协程、关闭 loop 并清理所有未完成的任务。不要手动管理 loop,除非你清楚自己在做什么。

    结果与总结

    重写后的效果:

  • 采集一轮从 **75 秒** 降到 **6-8 秒**(10 并发)
  • CPU 占用从 **75%** 降到 **25%**
  • 内存从 **1.2GB** 降到 **280MB**
  • 代码行数从 **850 行**(线程池 + 回调)降到 **420 行**
  • 踩的坑也不少,总结几个最容易中招的:

    坑 1:混用同步阻塞调用

    # ❌ 这会阻塞整个事件循环
    await asyncio.sleep(1)
    result = some_sync_function()  # 如果这个函数执行 5 秒,所有协程都卡住
    

    排查方法:启动 asyncio 的调试模式——asyncio.run(main(), debug=True),事件循环会打印执行超过 100ms 的回调。

    坑 2:忘记 `await`

    result = fetch(session, url)  # ❌ 没 await,拿到的是一个 coroutine 对象,不是结果
    

    这是最频繁的错误。类型提示能帮上忙:fetch 返回值是 Coroutine 不是 dict,IDE 会标黄。

    坑 3:任务未捕获的异常会静默消失

    task = asyncio.create_task(some_coro())  # 如果 some_coro 抛异常,没人知道
    

    一定要注册 add_done_callback 或显式 await task

    task = asyncio.create_task(some_coro())
    task.add_done_callback(lambda t: t.exception() or None)
    # 或者
    try:
        await task
    except:
        ...
    

    坑 4:`asyncio.gather()` 默认不等待所有任务

    # gather 默认是「短路」模式:一个异常就取消其他任务
    results = await asyncio.gather(task1, task2, task3)
    

    如果你希望即使部分失败也要等全部完成,用 return_exceptions=True

    results = await asyncio.gather(task1, task2, task3, return_exceptions=True)
    for r in results:
        if isinstance(r, Exception):
            handle_error(r)
    

    延伸思考

    这套方案对 I/O 密集型很有效。但任务里混了 CPU 密集型操作(如 JSON 解析大量数据、加密解密),光靠 asyncio 不够——它不会让 Python 代码跑得更快。解法:

  • **用 `loop.run_in_executor()`** 把 CPU 密集型扔到线程池
  • **或者 asyncio + multiprocessing 混用**:每个进程一个事件循环,进程间通过 Redis 或消息队列通信
  • **Python 3.9+ 可以用 `asyncio.to_thread()`**,它是 `run_in_executor` 的简化版
  • 另外,如果生产级 asyncio 代码比较多,可以看看 trioanyio——它们用结构化并发(nursery)替代 asyncio.create_task,从根本上避免了「忘了等待任务」的问题。Python 3.11+ 引入的 TaskGroup(基于 PEP 654 ExceptionGroup)也在往这个方向靠。

    最后推荐几个我常用的配套库:

  • `aiofiles` — 异步文件读写(异步代码里别用 `open()`)
  • `asyncpg` — PostgreSQL 异步驱动(比 `aiomysql` 稳定得多)
  • `httpx` — 同时支持同步/异步的 HTTP 客户端,适合过渡期混用
  • `asyncio-throttle` — 比手工 `Semaphore` 更精细的限流
  • 你可能感兴趣的文章

    来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
    转载请注明出处: https://teachcourse.cn/4059.html ,谢谢支持!

    资源分享

    分类:Android 标签:
    SQLServer存储过程关于公用表表达式(CTE)和临时表的实例教程 SQLServer存储过程关于公用表
    Python实现提取文章关键字 Python实现提取文章关键字
    Android电脑局域网操作手机的工具 Android电脑局域网操作手机的工
    Android开发之TextView控件设置颜色切换器的问题 Android开发之TextView控件设

    评论已关闭!