Python 异步编程实战:asyncio 核心机制与性能优化

2026-05-12 23:04 Python 异步编程实战:asyncio 核心机制与性能优化已关闭评论

Python 异步编程实战:asyncio 核心机制与性能优化

背景:重构爬虫系统时,我从多线程切换到 asyncio,单机 QPS 从 200 提升到 2000+,中途踩了事件循环、协程调度、连接池等多个坑。

为什么你的 asyncio 代码可能比多线程还慢

见过太多人写了 async def 就以为是异步了,结果跑起来比同步还慢。

asyncio 本质是协作式并发——协程必须主动让出控制权,事件循环才能调度下一个任务。只要有一个 await 在等待时阻塞了,整个事件循环就会卡住。

异步代码里不能有同步阻塞调用

# 异步代码里用了同步阻塞的 requests 库
async def fetch_all(urls):
    results = []
    for url in urls:
        results.append(requests.get(url))  # 同步阻塞!整个事件循环卡在这里
    return results

# 真正的异步写法
async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        return await asyncio.gather(*tasks)

事件循环的运行机制

asyncio 的事件循环是一个单线程调度器,维护就绪队列等待队列两个队列。当协程执行到 await 时被移到等待队列,future 完成后重新放回就绪队列。

import asyncio

async def main():
    print("1. 主协程开始")
    await asyncio.sleep(0)  # 主动让出控制权
    print("2. 主协程恢复")

asyncio.run(main())

执行流程:

  1. main() 进入就绪队列,开始执行
  2. 打印 "1. 主协程开始"
  3. await asyncio.sleep(0) 将协程移到等待队列,调度器立即调度下一个协程
  4. 没有其他协程,调度器重新调用这个协程
  5. 打印 "2. 主协程恢复"

asyncio.sleep(0)asyncio.sleep(0.001) 不一样。前者只是让出控制权,后者会创建一个真正的定时器任务。

协程调度:gather vs create_task vs wait

这三个是最常用的并发调度方式,但适用场景完全不同。

`asyncio.gather()` — 等待所有任务完成

# 适合:多个独立任务,必须全部完成才继续
async def demo_gather():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    return results

`asyncio.create_task()` — 立即开始调度,不等待

# 适合:需要立即开始执行,但稍后要取结果
async def demo_task():
    task = asyncio.create_task(fetch_user(1))
    # 这里可以做其他事
    await asyncio.sleep(0.5)
    result = await task
    return result

`asyncio.wait()` — 更灵活的超时控制

# 适合:需要部分完成就继续,或者有超时要求
async def demo_wait():
    tasks = [fetch_user(i) for i in range(10)]
    done, pending = await asyncio.wait(tasks, timeout=2.0)
    # 2秒后,已完成的结果在 done 里,未完成的在 pending 里
    for task in pending:
        task.cancel()
    return [t.result() for t in done]

大多数场景用 gather 就够了。只有需要"启动任务后做点别的,再回来收结果"时才用 create_taskwait 的超时机制在爬虫限速、API 熔断场景非常有用。

性能优化:从 200 QPS 到 2000 QPS

优化一:连接池大小

默认的连接池参数在高频场景下会成为瓶颈。

import aiohttp

async def with_optimized_pool():
    connector = aiohttp.TCPConnector(
        limit=100,
        limit_per_host=30,
        keepalive_timeout=30
    )
    async with aiohttp.ClientSession(connector=connector) as session:
        pass

我遇到的问题是 limit_per_host 太小,并发一高就报 "Connection pool exhausted",监控显示很多时间耗在等待连接建立上。

优化二:批量任务的调度策略

一次性创建 10000 个任务会让内存暴涨,还可能打挂对方服务器。

async def batch_process(urls, batch_size=100):
    results = []
    for i in range(0, len(urls), batch_size):
        batch = urls[i:i + batch_size]
        tasks = [fetch(session, url) for url in batch]
        batch_results = await asyncio.gather(*tasks, return_exceptions=True)
        results.extend(batch_results)
        await asyncio.sleep(0)
    return results

优化三:信号量控制并发

async def with_semaphore(urls, max_concurrent=50):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_fetch(url):
        async with semaphore:
            return await fetch(url)

    tasks = [limited_fetch(url) for url in urls]
    return await asyncio.gather(*tasks)

这个技巧在对接第三方 API 时特别有用——对方通常有 rate limit,超过就直接封 IP。

踩坑实录:那些让我 debug 到凌晨的问题

坑一:忘记 await

# 错误:返回协程对象,不是结果
async def wrong():
    result = some_async_func()  # 忘记 await
    return result

# 正确
async def right():
    result = await some_async_func()
    return result

代码能跑,但返回了一个 coroutine 对象。后续试图用这个对象时会报错:TypeError: object Coroutine can't be used in 'await' expression

坑二:在非异步函数里调用异步函数

# 错误
def sync_func():
    result = await asyncio.run(async_func())

# 正确
async def wrapper():
    result = await async_func()

# 或者在同步代码里
def sync_caller():
    result = asyncio.run(async_func())

坑三:事件循环被多次运行

async def main():
    await asyncio.gather(asyncio.sleep(1), asyncio.sleep(2))

asyncio.run(main())  # OK
asyncio.run(main())  # RuntimeError: event loop is already running

在 Jupyter Notebook 或某些调试环境里,事件循环可能已经在运行了。这种情况要用 await 而不是 asyncio.run()

性能对比数据

爬取 1000 个 HTTP 接口的实测结果:

实现方式 总耗时 QPS 内存峰值
同步 requests 120s 8 80MB
多线程 (50线程) 8s 125 350MB
asyncio (无优化) 6s 166 120MB
asyncio (连接池优化) 2.5s 400 130MB
asyncio (批量+信号量) 0.5s 2000 140MB

连接池配置对 asyncio 性能影响最大,占 60% 以上的优化效果。

结论

asyncio 不是银弹,本质是 IO 密集型任务的并发提升。对于 CPU 密集型任务,多进程才是正解。记住三个要点:

  1. 永远不要在异步代码里调用同步阻塞函数
  2. 连接池参数是性能瓶颈的主要来源
  3. 用信号量和批量处理防止打挂对方服务器或自己

如果异步代码跑得比同步还慢,先检查有没有漏写的 await,再看有没有用错库(requests 换成 aiohttptime.sleep 换成 asyncio.sleep)。

延伸思考

asyncio 能否替代多线程? 对于 IO 密集型任务,asyncio 通常更高效;对于需要真正并行执行的 CPU 密集型任务,或者需要利用多核的场景,多进程 + asyncio 是更好的组合。

其他异步方案:Python 还有 aiofiles(异步文件 IO)、asyncpg(异步 PostgreSQL)、motor(异步 MongoDB)等生态。选择库之前,先确认性能瓶颈在哪儿。

调试工具asyncio.run() 配合 pytest-asyncio 是基础标配。想看协程调度细节,可以用 asyncio.create_task() 并在任务里加日志,观察任务的生命周期。

你可能感兴趣的文章

来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
转载请注明出处: https://teachcourse.cn/4134.html ,谢谢支持!

资源分享

分类:Android 标签:
静态代码块,非静态代码块和构造方法执行顺序 静态代码块,非静态代码块和构造
Claude Code提升开发效率指南 Claude Code提升开发效率指南
APP签名的三种方式使用说明 APP签名的三种方式使用说明
Ubuntu系统flask服务和wsgi运行示例说明 Ubuntu系统flask服务和wsgi运行

评论已关闭!