3年踩坑总结:Python异步编程入门,先学会用 `asyncio.run()` 再说别的

2026-05-03 21:42 3年踩坑总结:Python异步编程入门,先学会用 `asyncio.run()` 再说别的已关闭评论

3年踩坑总结:Python异步编程入门,先学会用 `asyncio.run()` 再说别的

背景:团队要把一个日处理 10 万条数据的同步脚本改成异步,我被安排做技术调研,之前对 async/await 只有概念层面的了解。

问题是什么

场景很典型:一个 for 循环里连续调了 5 个 HTTP API,每个请求等 2-3 秒才返回,串行下来一次处理要 15 秒。10 万条数据算下来差不多 17 天——这显然不能忍。

直觉告诉我要用"异步",但我们小组之前一直写的都是同步代码,Python 异步的落地经验为零。查了一圈文档和博客,网上教程要么是 "Hello World" 级别的 demo,要么直接甩出 asyncio.gatherSemaphore 就结束了,根本不解释"为什么这么写"以及"坑在哪"。

我花了大概两周,把整条数据处理链路全改成了异步。这篇文章是那段时间的实战笔记。

解决思路

选型没什么悬念:

  • **`concurrent.futures.ThreadPoolExecutor`** — 最简单,但 GIL 下 CPU 密集型不行,IO 密集型勉强能用。问题是线程开销和 thread-safe 的坑比想象的多,而且没有真正的协程调度。
  • **`multiprocessing`** — 太重,为了 IO 等待搞多进程是杀鸡用牛刀。
  • **`asyncio`** — Python 3.4 引入,3.5 加了 async/await 语法糖,3.7 之后 API 基本稳定。协程比线程开销小一个数量级,调度完全在用户态,不用加锁保护共享数据(前提是你别混用同步代码)。
  • 果断选 asyncio。但选了不代表会了——第一版代码写完,跑起来全是 Warning 和意外阻塞。

    操作步骤

    步骤1:把同步函数改成异步函数

    最基础的一步。原始代码长这样:

    import requests
    
    def fetch_data(api_url):
        resp = requests.get(api_url, timeout=10)
        return resp.json()
    

    改成:

    import aiohttp
    import asyncio
    
    async def fetch_data(session, api_url):
        async with session.get(api_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
            return await resp.json()
    

    核心就两件事:defasync def,同步 requests → 异步 aiohttp

    注意: requests 不支持异步。网上有人用 asyncio.to_thread 包装 requests.get,这本质是开线程,不是真异步,并发上来性能会很难看。老老实实换 aiohttphttpx(async 模式)。

    步骤2:搞定协程的"入口点"

    Python 3.7+ 的推荐写法:

    async def main():
        async with aiohttp.ClientSession() as session:
            data = await fetch_data(session, "https://api.example.com/data")
            print(data)
    
    asyncio.run(main())
    

    asyncio.run() 是 3.7 加入的,帮你做了三件事:

  • 创建事件循环
  • 运行 main 协程直到完成
  • 关闭事件循环并清理所有未完成的协程
  • 别再用 loop = asyncio.get_event_loop(); loop.run_until_complete(main()),那是 3.7 之前的写法。我一开始照抄了老博客的代码,结果在 3.10 上跑出了 DeprecationWarning。

    步骤3:并发执行多个协程(关键)

    这是异步最核心的用途。单协程跟同步没区别——await 一样会等。

    错误的写法:

    async def main():
        async with aiohttp.ClientSession() as session:
            for url in urls:
                data = await fetch_data(session, url)  # 还是串行!
    

    正确的写法:

    async def main():
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_data(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
    

    asyncio.gather 把所有协程包装成 Task 提交到事件循环并行执行。五个 3 秒的请求,串行 15 秒,并行只要 3 秒出头。

    注意: asyncio.gather 默认情况下第一个异常会直接往外抛,其他协程不会被取消,会在后台继续跑完。这其实也是个坑——失败的请求你没处理,但剩下的还在跑。传 return_exceptions=True 可以把异常塞进 results 列表里,回来再统一处理。

    步骤4:控制并发数(血的教训)

    第一个版本直接用 asyncio.gather 丢了 1000 个 URL 进去,结果 aiohttp.ClientSession 一口气创建了 1000 个连接,直接把目标 API 服务器打挂了——对方返回 503,运维那边报警了。

    需要用信号量(Semaphore)控制并发:

    sem = asyncio.Semaphore(10)  # 最多10个并发
    
    async def fetch_with_limit(session, url):
        async with sem:
            return await fetch_data(session, url)
    
    async def main():
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_with_limit(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
    

    Semaphore 内部的计数器在 async with sem 进入时减一,退出时加一。达到上限时后面的协程会自动排队。

    我最后翻了下对方的 API 文档,把并发上限设成 20,之后一直稳定运行。

    步骤5:超时和重试(产品级必备)

    网络请求不设超时等于给自己埋雷:

    import asyncio
    from asyncio import TimeoutError
    
    async def fetch_with_retry(session, url, retries=3, timeout=10):
        for attempt in range(retries):
            try:
                async with asyncio.timeout(timeout):
                    async with session.get(url) as resp:
                        return await resp.json()
            except (TimeoutError, aiohttp.ClientError) as e:
                if attempt == retries - 1:
                    raise  # 最后一次还失败,往上抛
                wait = 2 ** attempt  # 指数退避:1s, 2s, 4s
                await asyncio.sleep(wait)
    

    asyncio.timeout() 是 Python 3.11 引入的,之前得用 asyncio.wait_for(fetch_data(...), timeout=10)。用 3.11+ 的话,直接这么写就行。

    结果与总结

    改造后,从串行 15 秒/条降到了并行 3.5 秒/条(20 并发),10 万条数据从 17 天降到了不到 5 小时。这还是加了重试和对方 API 限速的情况下。

    几个坑记一下:

  • **同步代码阻塞事件循环** — 我在协程里不小心混了一个 `time.sleep(1)`(应该用 `await asyncio.sleep(1)`),结果整个事件循环停了 1 秒,所有并发请求全卡住。排查了好久才发现。
  • **`asyncio.run()` 每次创建新事件循环** — 在 Jupyter Notebook 里不能重复调 `asyncio.run()`,得用 `await` 或者 `import nest_asyncio; nest_asyncio.apply()`。脚本环境不存在这个问题,但用 notebook 做原型验证的时候踩了一脚。
  • **协程不是 Task** — 只调 `async_func()` 不会执行,它返回一个 coroutine 对象。必须 `await` 或交给 `asyncio.create_task()` 或 `asyncio.gather()`。我第一次写的时候忘了 `await`,脚本直接退出了,什么都没干。
  • 延伸思考

  • **`asyncio.run()` 不够用了怎么办?**——如果需要在同一个事件循环里多次执行,或者往已有事件循环里追加任务,可以用 `asyncio.new_event_loop()` + `loop.run_forever()` 手动管理,但日常场景不需要碰这个。
  • **`asyncio.gather` 之外**——Python 3.11 加了 `asyncio.TaskGroup`,语义上更接近 `trio` 的 nursery 模式,异常处理更合理。新项目可以直接用 `TaskGroup`。
  • **混合异步和同步代码**——如果有大量同步库不能换(比如 `psycopg2`、`redis-py` 旧版),可以用 `asyncio.to_thread()` 或 `loop.run_in_executor()` 把同步调用扔到线程池里,避免阻塞事件循环。但这只能作为过渡方案。
  • **真正的异步数据库驱动**——同步数据库驱动即使跑在 `to_thread` 里也只是伪异步。2026 年的生态已经比较成熟了:`asyncpg`(PostgreSQL)、`aiomysql`、`redis-py` 4.x 原生支持 async。换掉之后才是真正的全链路异步。
  • 你可能感兴趣的文章

    来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
    转载请注明出处: https://teachcourse.cn/4055.html ,谢谢支持!

    资源分享

    分类:Android 标签:
    mysql数据库导出和导入 mysql数据库导出和导入
    网站添加百度、谷歌、搜狗、好搜搜录 网站添加百度、谷歌、搜狗、好搜
    nginx重启:nginx nginx重启:nginx
    Android常见布局 Android常见布局

    评论已关闭!