Rust 异步编程与 Tokio 运行时深度实战
用了一年半 Tokio,踩了无数次坑之后,我的最终结论是:Rust 的异步不是「可选的性能优化」,而是一种全新的控制流范式——你必须在第一天就理解它的执行模型,否则后续每一个并发问题都会让你怀疑人生。
背景:为什么要碰 Rust 异步?
我手上的项目是一个实时数据处理管道,需要同时处理数千个 WebSocket 连接,每个连接做协议解析、数据聚合、缓存读写。最开始我用同步线程池加 std::sync,连接数一上 2000,内存直接飙到 2GB,上下文切换把 CPU 吃满。
换 Tokio 之后同样处理 5000 连接,内存稳定在 400MB 左右。
这不是量变,是质变。
理解 Tokio 的「协作式调度」
网上很多教程告诉你「async/await 就是语法糖」,这句话害人不浅。
// 错误示范:你以为在异步,实际在阻塞
async fn handle_request(stream: TcpStream) {
let mut buf = [0u8; 1024];
// 问题:std::fs::read 是同步阻塞操作
let config = std::fs::read_to_string("config.toml").unwrap();
// 读取期间,整个线程被阻塞,其他任务全部停下
stream.read(&mut buf).await.unwrap();
}
这是 Tokio 初学者最容易犯的错误——在 async 函数里调了同步阻塞 API,自己还完全不知道。
Tokio 的线程模型是 M:N,默认工作线程数等于 CPU 核心数。每个线程上跑着大量任务,靠 async/await 的挂起点做协作切换。如果一个任务不主动 yield(.await),它会一直霸占线程。
标准库的 std::fs、std::time::Duration::sleep、std::sync::Mutex 全是阻塞的。在 async 上下文中必须用它们的 Tokio 替代品。
正确的做法:
use tokio::fs;
use tokio::time::{sleep, Duration};
async fn handle_request(stream: TcpStream) {
// tokio::fs 是异步版本
let config = fs::read_to_string("config.toml").await.unwrap();
// 这里 .await 时,线程会去执行其他任务
}
一个真实的 WebSocket 服务器
这是我最初写的版本,只处理一件事:接收消息并广播给所有连接。
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use futures_util::StreamExt;
use std::sync::Arc;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("0.0.0.0:9000").await.unwrap();
let (tx, _) = broadcast::channel::<String>(100);
loop {
let (stream, addr) = listener.accept().await.unwrap();
let tx = tx.clone();
tokio::spawn(async move {
let ws_stream = accept_async(stream).await.unwrap();
let (mut write, mut read) = ws_stream.split();
let mut rx = tx.subscribe();
let recv_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// ... 发送给客户端
}
});
while let Some(Ok(msg)) = read.next().await {
tx.send(msg.to_string()).unwrap();
}
});
}
}
这个版本能跑,但跑 30 分钟后开始出问题:
- 某些客户端连接后收不到消息
- 内存缓慢增长
- 极端情况下 panic
踩坑实录——我犯的 4 个致命错误
broadcast channel 的滞后丢弃
tokio::sync::broadcast 有个行为:当消费者速度跟不上发送者时,新消息会覆盖旧消息,落下的消费者会收到 RecvError::Lagged(n)。
// 问题代码——悄无声息地丢消息
while let Ok(msg) = rx.recv().await {
if let Err(e) = write.send(msg.into()).await {
break;
}
}
recv() 返回 Lagged 时我什么都没做,客户端直接跳过了中间消息。
修复方案:必须处理 Lagged,至少记录日志,或者让客户端做全量同步。
loop {
match rx.recv().await {
Ok(msg) => {
if let Err(e) = write.send(msg.into()).await {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("client {} lagged behind by {} messages", client_id, n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
unbounded channel 是内存炸弹
初期我用 futures::channel::mpsc::unbounded() 做任务间通信,觉得无界很方便。
结果生产环境一个上游抖动,消息积压 800 万条,服务 OOM 被杀。
教训:生产环境永远用 bounded channel。
let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(1024);
match tx.try_send(msg) {
Ok(_) => {}
Err(TrySendError::Full(_)) => {
// 明确处理:丢弃旧消息、拒绝新消息或者背压到上游
}
Err(TrySendError::Closed(_)) => {}
}
tokio::spawn 后没有 JoinHandle
每个 tokio::spawn 返回一个 JoinHandle。不 join 的话,任务内部 panic 了你永远不会知道。
// panic 被静默吞掉
tokio::spawn(async move {
do_something().await;
});
// 正确做法:至少做错误监控
let handle = tokio::spawn(async move {
if let Err(e) = do_something().await {
error!("task failed: {}", e);
}
});
后来我加了一个全局的任务监控层,统一处理 panic 和重试:
pub fn spawn_monitored<F>(future: F, task_name: &'static str)
where
F: Future<Output = Result<(), Box<dyn std::error::Error + Send>>> + Send + 'static,
{
tokio::spawn(async move {
if let Err(e) = future.await {
error!("[{}] task failed: {:?}", task_name, e);
metrics::counter!("task_failure_total", "task" => task_name).increment(1);
}
});
}
没有合理的超时机制
一个第三方 API 的 WebSocket 连接在极端网络下 hang 住 15 分钟,依赖这个连接的所有任务全被堵住,而线程池只有 8 个线程。
解决方案:所有 I/O 操作必须包一层超时。
use tokio::time::{timeout, Duration};
async fn read_with_timeout(stream: &mut TcpStream) -> Result<Vec<u8>, MyError> {
match timeout(Duration::from_secs(30), read_frame(stream)).await {
Ok(Ok(data)) => Ok(data),
Ok(Err(e)) => Err(e),
Err(_elapsed) => {
warn!("read timeout, closing connection");
Err(MyError::Timeout)
}
}
}
重构——一个可靠的生产架构
吃够了亏之后,我重新设计了架构。核心原则就一条:每个组件都假设邻居会挂。
┌─────────────┐
│ Connection │
│ Manager │
└──────┬──────┘
│ spawn per connection
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Reader │ │ Reader │ │ Reader │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└─────────────┼─────────────┘
│ mpsc channel (bounded)
▼
┌──────────────┐
│ Processor │
│ (batch + │
│ aggregate) │
└──────┬───────┘
│
┌──────▼───────┐
│ Writer │
└──────────────┘
Processor 的优雅关闭是关键:
pub async fn run_processor(
mut rx: mpsc::Receiver<Message>,
shutdown: tokio_util::sync::CancellationToken,
) -> Result<(), ProcessorError> {
let mut batch = Vec::with_capacity(100);
loop {
tokio::select! {
Some(msg) = rx.recv() => {
batch.push(msg);
if batch.len() >= 100 {
process_batch(&mut batch).await?;
batch.clear();
}
}
_ = shutdown.cancelled() => {
if !batch.is_empty() {
process_batch(&mut batch).await?;
}
info!("processor shut down gracefully");
return Ok(());
}
}
}
}
CancellationToken 是关键:所有长时间运行的任务持有同一个 token,服务关闭时统一触发,每个任务都能在安全点退出。
性能调优的几个关键数字
Tokio 工作线程数:默认等于 CPU 核心数,纯 I/O 场景下这是最优的。如果你有大量 CPU 计算(比如序列化/反序列化),需要调大 tokio::main(flavor = "multi_thread", worker_threads = N)。
Mutex vs RwLock:在 Tokio 中永远用 tokio::sync::Mutex 而不是 std::sync::Mutex。后者阻塞线程,前者只阻塞任务。
// 错误
let data = std::sync::Mutex::new(vec![]);
// 正确
let data = tokio::sync::Mutex::new(vec![]);
但如果临界区非常短(几微秒),std::sync::Mutex 反而更快——tokio::sync::Mutex 有任务切换开销。调优的唯一标准是 profiling。
Channel size:我通过生产流量回放加压力测试得出,bounded(1024) 在丢消息率和延迟之间达到最优平衡。小于 256 丢消息太多,大于 4096 内存占用暴涨。
最后聊几句
Rust 的异步编程跟 Go 的 goroutine 有个本质区别:Go 帮你藏着调度器,你写同步代码就是写异步代码;Rust 把调度器暴露给你,你得自己管 Pin、Send、生命周期——代价更高,但换来零成本抽象和精细控制。
如果你刚开始用 Tokio,我的建议很简单:
- 第一周把 Tokio 文档的八个示例全部手动敲一遍(不要复制)
- 第一个月写一个小型 TCP 或 WebSocket 服务器,故意制造并发问题去解
- 三个月后你会发现,你不想再写同步代码了
异步不是 Rust 的附加功能——它是 Rust 解决高并发问题的最终答案。我现在 5000 连接的场景下单机跑一个月不重启,Tokio 一次 panic 都没有。前提是:认真对待每个 .await,认真处理每个错误。

评论已关闭!