别被 async/await 吓到:Python 异步编程其实就这三板斧
同事用同步代码写了个爬虫,爬 100 个页面花了 80 秒,我说让他试试异步,3 秒搞定。
问题是什么
接了个数据采集的活儿:从 100 个 API 端点拉 JSON,每个请求平均 0.8 秒。我用 requests 写了个 for 循环,跑完一分半钟。
同步请求就像在便利店排队——你站收银台前,等前面的人买完单,才能把手里这瓶水放上去。CPU 在这 80 秒里大部分时间都在 I/O 等待,啥也没干。
能不能让这 100 个请求同时发出去,谁先回来先处理谁?
解决思路
Python 处理并发有三条路:
| 方案 | 原理 | 适用场景 | 代码侵入性 |
|---|---|---|---|
threading 多线程 |
OS 线程切换 | I/O 密集型 | 中(需处理线程安全) |
multiprocessing 多进程 |
独立进程并行 | CPU 密集型 | 高(进程间通信复杂) |
asyncio 异步 I/O |
事件循环单线程协作 | 网络 I/O 密集型 | 低(现代 Python 原生支持) |
爬网页、调 API、读写数据库这些 I/O 密集型场景,asyncio 是最轻量的方案。2014 年 Python 3.4 引入 asyncio,3.5 加了 async/await 语法糖后,写异步代码的体验已经接近同步代码。
核心思路:把 I/O 等待时间让出来,让事件循环去调度其他任务。好比一个人同时烧三壶水——把水壶放上燃气灶,不等烧开就去切菜,听到水响再回来关火。
操作步骤
步骤 1:从同步到异步,替换 HTTP 客户端
同步代码用 requests,但它不支持异步。换成 httpx 或 aiohttp,我选了 httpx——API 和 requests 几乎一样,迁移成本最低。
pip install httpx
先看同步代码长什么样:
import time
import requests
urls = [f"https://api.example.com/data/{i}" for i in range(100)]
def fetch(url):
resp = requests.get(url)
return resp.json()
start = time.time()
results = [fetch(url) for url in urls]
print(f"同步耗时: {time.time() - start:.2f}s")
这段代码的问题:每次 requests.get(url) 都阻塞当前线程,直到拿到响应才继续下一行。100 个请求串行执行,总时间 = 100 × 单次延迟 ≈ 80 秒。
步骤 2:用 async/await 声明异步函数
异步函数用 async def 定义,里面耗时 I/O 操作前加 await:
import asyncio
import httpx
async def fetch_async(url):
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp.json()
关键变化:
注意:
await只能在async def内部使用,不能在全局作用域直接写await client.get()。
步骤 3:用 asyncio.gather 并发执行
单个 await fetch_async(url) 还是串行,得把多个协程"包在一起"并发跑:
async def main():
async with httpx.AsyncClient() as client:
tasks = [fetch_async(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
start = time.time()
results = asyncio.run(main())
print(f"异步耗时: {time.time() - start:.2f}s")
asyncio.gather 是这里的核心:它接收多个协程对象,并发调度它们,返回所有结果的列表。100 个请求几乎同时发出,总时间 ≈ 最慢的那个请求的耗时 ≈ 0.8 秒。
asyncio.run(main()) 是入口,它创建事件循环、运行 main() 协程、最后关闭循环。
步骤 4:加信号量控制并发数
但是!如果真的一次发 1000 个请求,目标服务器可能直接把你 IP 封了——这叫"打满连接",和 DDOS 只有量的区别。
需要限制并发数,用 asyncio.Semaphore:
async def fetch_with_semaphore(url, client, sem):
async with sem: # 获取信号量,超过限制就等待
resp = await client.get(url)
return resp.json()
async def main():
sem = asyncio.Semaphore(5) # 最多同时发 5 个请求
async with httpx.AsyncClient() as client:
tasks = [fetch_with_semaphore(url, client, sem) for url in urls]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
Semaphore(5) 像一个有 5 把钥匙的挂锁盒——每次请求取一把钥匙,用完归还。没钥匙就排队等着。这样既利用异步加速,又不会把目标服务器打崩。
步骤 5:超时和异常处理
异步代码出了异常不会自动崩,但会安静地挂在那里。必须加超时和 try/except:
import asyncio
import httpx
from asyncio import TimeoutError
async def safe_fetch(url, client, sem, timeout=10.0):
async with sem:
try:
resp = await asyncio.wait_for(client.get(url), timeout=timeout)
resp.raise_for_status()
return resp.json()
except TimeoutError:
print(f"[超时] {url}")
return None
except httpx.HTTPStatusError as e:
print(f"[HTTP错误] {url}: {e.response.status_code}")
return None
except Exception as e:
print(f"[未知错误] {url}: {e}")
return None
async def main():
sem = asyncio.Semaphore(5)
timeout = httpx.Timeout(10.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
tasks = [safe_fetch(url, client, sem) for url in urls]
results = await asyncio.gather(*tasks)
success = [r for r in results if r is not None]
print(f"成功: {len(success)}/{len(urls)}")
return success
asyncio.wait_for(fut, timeout) 给每个协程设了个闹钟——超时就扔 TimeoutError,不会让单个请求拖死整个程序。
结果
我的爬虫改成异步后,100 个请求从 80 秒降到 1.2 秒(加了并发限流 5),快了 60 多倍。完整的代码模板大概 40 行,之后所有 API 调用我都用这个框架。
踩过的几个坑:
延伸思考

评论已关闭!