Rust 异步编程与 Tokio 运行时深度实战

2026-05-08 22:41 Rust 异步编程与 Tokio 运行时深度实战已关闭评论

Rust 异步编程与 Tokio 运行时深度实战

用了一年半 Tokio,踩了无数次坑之后,我的最终结论是:Rust 的异步不是「可选的性能优化」,而是一种全新的控制流范式——你必须在第一天就理解它的执行模型,否则后续每一个并发问题都会让你怀疑人生。

背景:为什么要碰 Rust 异步?

我手上的项目是一个实时数据处理管道,需要同时处理数千个 WebSocket 连接,每个连接做协议解析、数据聚合、缓存读写。最开始我用同步线程池加 std::sync,连接数一上 2000,内存直接飙到 2GB,上下文切换把 CPU 吃满。

换 Tokio 之后同样处理 5000 连接,内存稳定在 400MB 左右。

这不是量变,是质变。

理解 Tokio 的「协作式调度」

网上很多教程告诉你「async/await 就是语法糖」,这句话害人不浅。

// 错误示范:你以为在异步,实际在阻塞
async fn handle_request(stream: TcpStream) {
    let mut buf = [0u8; 1024];
    // 问题:std::fs::read 是同步阻塞操作
    let config = std::fs::read_to_string("config.toml").unwrap();
    // 读取期间,整个线程被阻塞,其他任务全部停下
    stream.read(&mut buf).await.unwrap();
}

这是 Tokio 初学者最容易犯的错误——在 async 函数里调了同步阻塞 API,自己还完全不知道。

Tokio 的线程模型是 M:N,默认工作线程数等于 CPU 核心数。每个线程上跑着大量任务,靠 async/await 的挂起点做协作切换。如果一个任务不主动 yield(.await),它会一直霸占线程。

标准库的 std::fsstd::time::Duration::sleepstd::sync::Mutex 全是阻塞的。在 async 上下文中必须用它们的 Tokio 替代品。

正确的做法:

use tokio::fs;
use tokio::time::{sleep, Duration};

async fn handle_request(stream: TcpStream) {
    // tokio::fs 是异步版本
    let config = fs::read_to_string("config.toml").await.unwrap();
    // 这里 .await 时,线程会去执行其他任务
}

一个真实的 WebSocket 服务器

这是我最初写的版本,只处理一件事:接收消息并广播给所有连接。

use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use futures_util::StreamExt;
use std::sync::Arc;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:9000").await.unwrap();
    let (tx, _) = broadcast::channel::<String>(100);

    loop {
        let (stream, addr) = listener.accept().await.unwrap();
        let tx = tx.clone();
        
        tokio::spawn(async move {
            let ws_stream = accept_async(stream).await.unwrap();
            let (mut write, mut read) = ws_stream.split();
            
            let mut rx = tx.subscribe();
            let recv_task = tokio::spawn(async move {
                while let Ok(msg) = rx.recv().await {
                    // ... 发送给客户端
                }
            });
            
            while let Some(Ok(msg)) = read.next().await {
                tx.send(msg.to_string()).unwrap();
            }
        });
    }
}

这个版本能跑,但跑 30 分钟后开始出问题:

  • 某些客户端连接后收不到消息
  • 内存缓慢增长
  • 极端情况下 panic

踩坑实录——我犯的 4 个致命错误

broadcast channel 的滞后丢弃

tokio::sync::broadcast 有个行为:当消费者速度跟不上发送者时,新消息会覆盖旧消息,落下的消费者会收到 RecvError::Lagged(n)

// 问题代码——悄无声息地丢消息
while let Ok(msg) = rx.recv().await {
    if let Err(e) = write.send(msg.into()).await {
        break;
    }
}

recv() 返回 Lagged 时我什么都没做,客户端直接跳过了中间消息。

修复方案:必须处理 Lagged,至少记录日志,或者让客户端做全量同步。

loop {
    match rx.recv().await {
        Ok(msg) => {
            if let Err(e) = write.send(msg.into()).await {
                break;
            }
        }
        Err(broadcast::error::RecvError::Lagged(n)) => {
            warn!("client {} lagged behind by {} messages", client_id, n);
        }
        Err(broadcast::error::RecvError::Closed) => break,
    }
}

unbounded channel 是内存炸弹

初期我用 futures::channel::mpsc::unbounded() 做任务间通信,觉得无界很方便。

结果生产环境一个上游抖动,消息积压 800 万条,服务 OOM 被杀。

教训:生产环境永远用 bounded channel。

let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(1024);

match tx.try_send(msg) {
    Ok(_) => {}
    Err(TrySendError::Full(_)) => {
        // 明确处理:丢弃旧消息、拒绝新消息或者背压到上游
    }
    Err(TrySendError::Closed(_)) => {}
}

tokio::spawn 后没有 JoinHandle

每个 tokio::spawn 返回一个 JoinHandle。不 join 的话,任务内部 panic 了你永远不会知道。

// panic 被静默吞掉
tokio::spawn(async move {
    do_something().await;
});

// 正确做法:至少做错误监控
let handle = tokio::spawn(async move {
    if let Err(e) = do_something().await {
        error!("task failed: {}", e);
    }
});

后来我加了一个全局的任务监控层,统一处理 panic 和重试:

pub fn spawn_monitored<F>(future: F, task_name: &'static str)
where
    F: Future<Output = Result<(), Box<dyn std::error::Error + Send>>> + Send + 'static,
{
    tokio::spawn(async move {
        if let Err(e) = future.await {
            error!("[{}] task failed: {:?}", task_name, e);
            metrics::counter!("task_failure_total", "task" => task_name).increment(1);
        }
    });
}

没有合理的超时机制

一个第三方 API 的 WebSocket 连接在极端网络下 hang 住 15 分钟,依赖这个连接的所有任务全被堵住,而线程池只有 8 个线程。

解决方案:所有 I/O 操作必须包一层超时。

use tokio::time::{timeout, Duration};

async fn read_with_timeout(stream: &mut TcpStream) -> Result<Vec<u8>, MyError> {
    match timeout(Duration::from_secs(30), read_frame(stream)).await {
        Ok(Ok(data)) => Ok(data),
        Ok(Err(e)) => Err(e),
        Err(_elapsed) => {
            warn!("read timeout, closing connection");
            Err(MyError::Timeout)
        }
    }
}

重构——一个可靠的生产架构

吃够了亏之后,我重新设计了架构。核心原则就一条:每个组件都假设邻居会挂。

                     ┌─────────────┐
                     │  Connection  │
                     │   Manager    │
                     └──────┬──────┘
                            │ spawn per connection
              ┌─────────────┼─────────────┐
              ▼             ▼             ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ Reader   │ │ Reader   │ │ Reader   │
        └────┬─────┘ └────┬─────┘ └────┬─────┘
             │             │             │
             └─────────────┼─────────────┘
                           │ mpsc channel (bounded)
                           ▼
                    ┌──────────────┐
                    │  Processor   │
                    │  (batch +    │
                    │   aggregate) │
                    └──────┬───────┘
                           │
                    ┌──────▼───────┐
                    │   Writer     │
                    └──────────────┘

Processor 的优雅关闭是关键:

pub async fn run_processor(
    mut rx: mpsc::Receiver<Message>,
    shutdown: tokio_util::sync::CancellationToken,
) -> Result<(), ProcessorError> {
    let mut batch = Vec::with_capacity(100);
    
    loop {
        tokio::select! {
            Some(msg) = rx.recv() => {
                batch.push(msg);
                if batch.len() >= 100 {
                    process_batch(&mut batch).await?;
                    batch.clear();
                }
            }
            _ = shutdown.cancelled() => {
                if !batch.is_empty() {
                    process_batch(&mut batch).await?;
                }
                info!("processor shut down gracefully");
                return Ok(());
            }
        }
    }
}

CancellationToken 是关键:所有长时间运行的任务持有同一个 token,服务关闭时统一触发,每个任务都能在安全点退出。

性能调优的几个关键数字

Tokio 工作线程数:默认等于 CPU 核心数,纯 I/O 场景下这是最优的。如果你有大量 CPU 计算(比如序列化/反序列化),需要调大 tokio::main(flavor = "multi_thread", worker_threads = N)

Mutex vs RwLock:在 Tokio 中永远用 tokio::sync::Mutex 而不是 std::sync::Mutex。后者阻塞线程,前者只阻塞任务。

// 错误
let data = std::sync::Mutex::new(vec![]);
// 正确
let data = tokio::sync::Mutex::new(vec![]);

但如果临界区非常短(几微秒),std::sync::Mutex 反而更快——tokio::sync::Mutex 有任务切换开销。调优的唯一标准是 profiling。

Channel size:我通过生产流量回放加压力测试得出,bounded(1024) 在丢消息率和延迟之间达到最优平衡。小于 256 丢消息太多,大于 4096 内存占用暴涨。

最后聊几句

Rust 的异步编程跟 Go 的 goroutine 有个本质区别:Go 帮你藏着调度器,你写同步代码就是写异步代码;Rust 把调度器暴露给你,你得自己管 PinSend、生命周期——代价更高,但换来零成本抽象和精细控制。

如果你刚开始用 Tokio,我的建议很简单:

  • 第一周把 Tokio 文档的八个示例全部手动敲一遍(不要复制)
  • 第一个月写一个小型 TCP 或 WebSocket 服务器,故意制造并发问题去解
  • 三个月后你会发现,你不想再写同步代码了

异步不是 Rust 的附加功能——它是 Rust 解决高并发问题的最终答案。我现在 5000 连接的场景下单机跑一个月不重启,Tokio 一次 panic 都没有。前提是:认真对待每个 .await,认真处理每个错误。

你可能感兴趣的文章

来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
转载请注明出处: https://teachcourse.cn/4122.html ,谢谢支持!

资源分享

分类:Android 标签:
Cursor、Claude Code、Copilot 三强对决:2026 AI 编程助手深度评测 Cursor、Claude Code、Copil
如何重用接口多个抽象方法中的一个或多个? 如何重用接口多个抽象方法中的一
一键Markdown转HTML工具 一键Markdown转HTML工具
Claude Code 尝试使用Agent Teams功能 Claude Code 尝试使用Agent T

评论已关闭!