别被 async/await 吓到:Python 异步编程其实就这三板斧

2026-05-03 21:23 别被 async/await 吓到:Python 异步编程其实就这三板斧已关闭评论

别被 async/await 吓到:Python 异步编程其实就这三板斧

同事用同步代码写了个爬虫,爬 100 个页面花了 80 秒,我说让他试试异步,3 秒搞定。

问题是什么

接了个数据采集的活儿:从 100 个 API 端点拉 JSON,每个请求平均 0.8 秒。我用 requests 写了个 for 循环,跑完一分半钟。

同步请求就像在便利店排队——你站收银台前,等前面的人买完单,才能把手里这瓶水放上去。CPU 在这 80 秒里大部分时间都在 I/O 等待,啥也没干。

能不能让这 100 个请求同时发出去,谁先回来先处理谁?

解决思路

Python 处理并发有三条路:

方案 原理 适用场景 代码侵入性
threading 多线程 OS 线程切换 I/O 密集型 中(需处理线程安全)
multiprocessing 多进程 独立进程并行 CPU 密集型 高(进程间通信复杂)
asyncio 异步 I/O 事件循环单线程协作 网络 I/O 密集型 低(现代 Python 原生支持)

爬网页、调 API、读写数据库这些 I/O 密集型场景,asyncio 是最轻量的方案。2014 年 Python 3.4 引入 asyncio,3.5 加了 async/await 语法糖后,写异步代码的体验已经接近同步代码。

核心思路:把 I/O 等待时间让出来,让事件循环去调度其他任务。好比一个人同时烧三壶水——把水壶放上燃气灶,不等烧开就去切菜,听到水响再回来关火。

操作步骤

步骤 1:从同步到异步,替换 HTTP 客户端

同步代码用 requests,但它不支持异步。换成 httpxaiohttp,我选了 httpx——API 和 requests 几乎一样,迁移成本最低。

pip install httpx

先看同步代码长什么样:

import time
import requests

urls = [f"https://api.example.com/data/{i}" for i in range(100)]

def fetch(url):
    resp = requests.get(url)
    return resp.json()

start = time.time()
results = [fetch(url) for url in urls]
print(f"同步耗时: {time.time() - start:.2f}s")

这段代码的问题:每次 requests.get(url) 都阻塞当前线程,直到拿到响应才继续下一行。100 个请求串行执行,总时间 = 100 × 单次延迟 ≈ 80 秒。

步骤 2:用 async/await 声明异步函数

异步函数用 async def 定义,里面耗时 I/O 操作前加 await

import asyncio
import httpx

async def fetch_async(url):
    async with httpx.AsyncClient() as client:
        resp = await client.get(url)
        return resp.json()

关键变化:

  • `def` → `async def`,告诉 Python 这是个协程函数
  • `requests.get()` → `await client.get()`,await 的意思是"我现在去干别的,数据准备好了叫我"
  • `with` → `async with`,因为 HTTP 客户端的关闭也是异步操作
  • 注意: await 只能在 async def 内部使用,不能在全局作用域直接写 await client.get()

    步骤 3:用 asyncio.gather 并发执行

    单个 await fetch_async(url) 还是串行,得把多个协程"包在一起"并发跑:

    async def main():
        async with httpx.AsyncClient() as client:
            tasks = [fetch_async(url) for url in urls]
            results = await asyncio.gather(*tasks)
            return results
    
    start = time.time()
    results = asyncio.run(main())
    print(f"异步耗时: {time.time() - start:.2f}s")
    

    asyncio.gather 是这里的核心:它接收多个协程对象,并发调度它们,返回所有结果的列表。100 个请求几乎同时发出,总时间 ≈ 最慢的那个请求的耗时 ≈ 0.8 秒。

    asyncio.run(main()) 是入口,它创建事件循环、运行 main() 协程、最后关闭循环。

    步骤 4:加信号量控制并发数

    但是!如果真的一次发 1000 个请求,目标服务器可能直接把你 IP 封了——这叫"打满连接",和 DDOS 只有量的区别。

    需要限制并发数,用 asyncio.Semaphore

    async def fetch_with_semaphore(url, client, sem):
        async with sem:  # 获取信号量,超过限制就等待
            resp = await client.get(url)
            return resp.json()
    
    async def main():
        sem = asyncio.Semaphore(5)  # 最多同时发 5 个请求
        async with httpx.AsyncClient() as client:
            tasks = [fetch_with_semaphore(url, client, sem) for url in urls]
            results = await asyncio.gather(*tasks)
            return results
    
    asyncio.run(main())
    

    Semaphore(5) 像一个有 5 把钥匙的挂锁盒——每次请求取一把钥匙,用完归还。没钥匙就排队等着。这样既利用异步加速,又不会把目标服务器打崩。

    步骤 5:超时和异常处理

    异步代码出了异常不会自动崩,但会安静地挂在那里。必须加超时和 try/except:

    import asyncio
    import httpx
    from asyncio import TimeoutError
    
    async def safe_fetch(url, client, sem, timeout=10.0):
        async with sem:
            try:
                resp = await asyncio.wait_for(client.get(url), timeout=timeout)
                resp.raise_for_status()
                return resp.json()
            except TimeoutError:
                print(f"[超时] {url}")
                return None
            except httpx.HTTPStatusError as e:
                print(f"[HTTP错误] {url}: {e.response.status_code}")
                return None
            except Exception as e:
                print(f"[未知错误] {url}: {e}")
                return None
    
    async def main():
        sem = asyncio.Semaphore(5)
        timeout = httpx.Timeout(10.0, connect=5.0)
        async with httpx.AsyncClient(timeout=timeout) as client:
            tasks = [safe_fetch(url, client, sem) for url in urls]
            results = await asyncio.gather(*tasks)
            success = [r for r in results if r is not None]
            print(f"成功: {len(success)}/{len(urls)}")
            return success
    

    asyncio.wait_for(fut, timeout) 给每个协程设了个闹钟——超时就扔 TimeoutError,不会让单个请求拖死整个程序。

    结果

    我的爬虫改成异步后,100 个请求从 80 秒降到 1.2 秒(加了并发限流 5),快了 60 多倍。完整的代码模板大概 40 行,之后所有 API 调用我都用这个框架。

    踩过的几个坑:

  • **`asyncio.run()` 不能嵌套调用** —— 如果在 Jupyter Notebook 里跑,需要用 `await main()` 而不是 `asyncio.run()`,因为 Notebook 已经在事件循环里了。
  • **不是所有库都支持异步** —— `requests`、`psycopg2`、`redis-py` 这些老牌库是同步的。需要用它们的异步替代品:`httpx`、`asyncpg`、`redis-asyncio`。
  • **CPU 密集型任务不要用 asyncio** —— 协程的协作式调度依赖任务主动让出控制权(await)。`await asyncio.sleep(1)` 会让出,但 `for i in range(10**8): pass` 不会——它会霸占事件循环,阻塞所有其他协程。CPU 密集任务请用 `multiprocessing` 或 `concurrent.futures.ProcessPoolExecutor`。
  • 延伸思考

  • **异步 ORM**:`SQLAlchemy 1.4+` 支持异步了,`database_url` 改成 `postgresql+asyncpg://` 就能用 `await session.execute()`,但事务管理比同步版本复杂不少。
  • **异步 Web 框架**:FastAPI 天然支持 async handler,路由函数直接 `async def`,催生了全异步的 Python Web 栈。
  • **Starlette / trio**:asyncio 之外还有 trio 这个备选事件循环,它的结构化并发(nursery 模式)比 asyncio 更激进也更安全。
  • **什么时候不值得用异步**:一两个接口、脚本跑一次就丢、耗时瓶颈在 CPU 而不是 I/O——这些场景用同步更省心,别为了技术时髦而引入额外的复杂度。
  • 你可能感兴趣的文章

    来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
    转载请注明出处: https://teachcourse.cn/4049.html ,谢谢支持!

    资源分享

    分类:Android 标签:
    浅谈final关键字 浅谈final关键字
    如何手动用Eclipse默认的keystore导出安卓应用 如何手动用Eclipse默认的keysto
    php7.0-fpm启动启动异常 php7.0-fpm启动启动异常
    ProgressBar+WebView实现自定义浏览器 ProgressBar+WebView实现自定

    评论已关闭!