消息队列选型与高可用架构实战

2026-05-18 22:23 消息队列选型与高可用架构实战已关闭评论

消息队列选型与高可用架构实战

做 MQ 选型,别纠结,先看业务场景再定技术:业务量日均百万级选 RabbitMQ,千万级选 RocketMQ,亿级且需要流式处理选 Kafka,要云原生且多租户选 Pulsar。

过去两年我主导了三个项目的 MQ 重构,每个踩过的坑都够写一篇检讨。这篇文章不聊空洞的概念,只讲实战。

一、先想清楚一个问题:你真的需要消息队列吗?

很多团队上 MQ 是因为"别人都在用"。我见过最离谱的项目,日均几百条消息,硬上了 Kafka,运维成本比业务代码还高。

什么场景必须上 MQ?

  • 异步解耦:用户注册后发邮件,不需要等邮件发送成功再返回
  • 流量削峰:秒杀系统瞬间流量上万 QPS,数据库扛不住
  • 数据同步:订单系统 → 仓储系统 → 财务系统,多系统靠消息驱动
  • 日志采集:业务日志需要统一采集、投递到大数据平台

日均 QPS 不到 1000 的团队,直接用 Redis List 或者数据库轮询就行,别给运维添负担。

二、主流 MQ 横向对比

先给一张我自用的选型对照表:

维度 RabbitMQ Apache Kafka RocketMQ Apache Pulsar
语言 Erlang Scala/Java Java Java
吞吐量 万级/s 百万级/s 十万级/s 百万级/s
延迟 微秒级 毫秒级 毫秒级 毫秒级
消息顺序 单队列有序 分区内有序 分区内有序 分区内有序
消息堆积 较弱 最强
运维复杂度 中高
社区活跃度 最高 中(国内活跃)

直接给结论:

  • 中小团队/项目早期 → RabbitMQ。文档全,社区大,遇到问题 Stack Overflow 一搜就有答案
  • 阿里系/Java 技术栈/需要事务消息 → RocketMQ。RocketMQ 的事务消息方案比 Kafka 的 exactly-once 成熟得多
  • 大数据管道/日志采集/流式计算 → Kafka。生态无敌,Flink、Spark 都是亲儿子
  • 多租户/云原生/存算分离 → Pulsar。架构最先进,但团队需要有一定技术储备

三、RabbitMQ 高可用部署实战

我第一次在生产用 RabbitMQ 就吃亏了——单节点部署,半夜挂了,全公司订单处理停了 40 分钟。

3.1 镜像队列模式(必配)

# 创建镜像队列策略,自动同步到所有节点
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

注意: ha-mode: all 会把队列复制到所有节点,节点多时建议用 ha-mode: exactly + ha-params: 2,只复制 2 份,节省资源。

3.2 生产配置模板

这套配置我线上用了两年没出过问题:

# /etc/rabbitmq/rabbitmq.conf
listeners.tcp.default = 5672

# 集群节点
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_consul
cluster_formation.consul.host = consul.service.consul
cluster_formation.consul.svc = rabbitmq

# 内存阈值——实际踩坑:默认 0.4 太低,大流量时频繁触发流控
vm_memory_high_watermark.relative = 0.7

# 磁盘阈值
disk_free_limit.absolute = 2GB

# 心跳超时,避免网络抖动导致频繁重连
heartbeat = 60

3.3 客户端连接必须做的事

import pika
import time
from retry import retry

class RabbitMQClient:
    def __init__(self):
        self.hosts = ['node1:5672', 'node2:5672', 'node3:5672']
        self.connection = None
    
    @retry(tries=3, delay=1, backoff=2)
    def connect(self):
        # 关键:使用心跳和阻塞连接超时
        params = pika.ConnectionParameters(
            host=self.hosts[0],
            heartbeat=60,
            blocked_connection_timeout=300,
            connection_attempts=3,
            retry_delay=1
        )
        self.connection = pika.BlockingConnection(params)
    
    def publish_with_confirm(self, exchange, routing_key, message):
        # 启用发布者确认——这是防丢消息的最后防线
        channel = self.connection.channel()
        channel.confirm_delivery()
        
        try:
            channel.basic_publish(
                exchange=exchange,
                routing_key=routing_key,
                body=message,
                properties=pika.BasicProperties(
                    delivery_mode=2,  # 持久化
                ),
                mandatory=True
            )
            return True
        except pika.exceptions.UnroutableError:
            # 消息没有路由到任何队列
            self._handle_failed_message(message)
            return False

这个 publish_with_confirm 是我踩坑后加上去的。早期代码直接 fire-and-forget,结果 RabbitMQ 节点挂了,消息丢了完全不知道。

四、Kafka 高可用架构踩坑记

公司之前用 Kafka 做埋点数据管道,日均 1.2 亿条消息。我接手时遇到三个致命问题。

4.1 坑一:分区数不是越多越好

前任把每个 Topic 设了 64 个分区,理由是"分区越多吞吐越大"。实际结果:消费者重平衡一次要 3 分钟,期间消费完全停摆。

真相:分区数 = 消费者的最大并行度。你只有 6 个消费者实例,设 64 个分区纯属浪费。

经验公式:

分区数 = max(消费者实例数 × 2, 预期的峰值吞吐量 / 单个分区吞吐量)

4.2 坑二:acks=1 丢数据不奇怪

# 生产配置——重要程度从高到低
acks=all                    # 等所有 ISR 副本都确认
min.insync.replicas=2       # 最小 ISR 副本数
replication.factor=3        # 副本因子
unclean.leader.election.enable=false  # 不允许非 ISR 副本成为 leader
retries=Integer.MAX_VALUE   # 无限重试
max.in.flight.requests.per.connection=1  # 保证顺序

acks=all 配合 min.insync.replicas=2 是关键组合。只配 acks=all 但 ISR 只有一个副本时,跟 acks=1 没区别。

4.3 坑三:消费端不处理异常导致 OOM

// 正确的消费姿势
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        try {
            process(record);
        } catch (Exception e) {
            // 记录到死信队列,而不是一直重试
            sendToDLQ(record, e);
            // 关键:继续消费下一条,不要阻塞
        }
    }
    // 处理完一批再提交 offset,不要每条都 commit
    consumer.commitSync();
}

我见过最离谱的代码:catch 块里什么都不做,直接 continue,消息消费失败就静默丢弃。排查业务数据对不上找了三天才发现是这个原因。

五、RocketMQ 事务消息实战

RocketMQ 的事务消息是我用过最靠谱的分布式事务方案,没有之一。

5.1 核心流程

// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(new TransactionListener() {
    
    // 执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 扣减库存 + 创建订单(同一个本地事务)
            orderService.createOrder((Long) arg);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    // 事务回查——RocketMQ 核心优势
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务是否成功
        String orderId = msg.getKeys();
        if (orderService.checkOrderExists(orderId)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }
});

// 发送半消息
SendResult result = producer.sendMessageInTransaction(msg, orderId);

核心思想:先发半消息(Broker 存储但不可见),执行本地事务成功后再 Commit。如果 Commit 失败,Broker 会轮询回查本地事务状态。

Kafka 也有事务消息,但我实战下来 RocketMQ 的回查机制远比他成熟。

5.2 事务消息使用限制

注意: RocketMQ 事务消息不支持定时消息和批量消息。Broker 配置 transactionTimeOut 默认 6 秒,可以根据业务调整。回查次数默认 15 次,超过会标记为未知状态,需要人工介入。

六、消息防丢的四重保障

在三个项目里验证过的方案,从轻到重:

第一层:生产者确认

  • RabbitMQ:Publisher Confirm
  • Kafka:acks=all
  • RocketMQ:同步发送 + SendResult.getSendStatus()

第二层:消息持久化

  • 所有 MQ 都支持持久化,但默认配置不一定开
  • 注意:持久化不是写磁盘就完事,要确认刷盘策略

第三层:消费端手动 ACK

def callback(ch, method, properties, body):
    try:
        process(body)
        # 业务处理成功才 ACK
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        # 处理失败:nack 并重新入队
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

第四层:本地消息表(终极方案)

对核心业务(订单、支付),不要依赖 MQ 本身的可靠性。

业务操作 → 本地事务写业务表 + 消息表(同库)
→ 独立定时任务扫描消息表发送到 MQ
→ MQ 消费成功后回调更新消息状态
→ 消费失败不停重试,直到人工介入

这套方案看起来笨重,但能扛得住任何 MQ 故障。我们支付链路就是靠它撑过了两次 Kafka 集群崩溃。

七、监控与运维实战

别等出了事再查日志,日常监控要到位。

7.1 必须盯的核心指标

生产者:
- 发送耗时 P99 > 500ms 报警
- 发送失败率 > 0.1% 报警

消费者:
- 消费延迟(lag)> 阈值报警
- 消费失败率 > 1% 报警

集群:
- 磁盘使用率 > 80% 报警(Kafka 对磁盘空间极其敏感)
- CPU load > 核心数 × 0.7 报警
- 网络带宽使用率 > 70% 报警

7.2 Kafka 监控一条命令搞定

# 查看消费组延迟
kafka-consumer-groups --bootstrap-server localhost:9092 \
  --group my-consumer-group --describe

重点关注 LAG 列。如果某个分区的 LAG 持续增长,说明消费者处理不过来。

7.3 磁盘故障实战一例

有次 Kafka 节点磁盘满了,整个分区不可用。当时监控只报了磁盘使用率,但没有预警增长速度——结果凌晨流量高峰时磁盘写爆,影响了核心业务 20 分钟。

后来加了个更早触发的预警:

# 每小时检查磁盘增长速率
# 如果过去1小时增长超过100GB,提前预警
watch -n 3600 'df -h /data/kafka | tail -1 | awk "{print \$5}"'

八、选型决策树

最后给个决策辅助,一条路走到底:

  1. 日均消息量 < 50 万 → 直接 RabbitMQ,别犹豫
  2. 日均消息量 50 万 - 1000 万,Java 技术栈 → RocketMQ
  3. 需要事务消息 → RocketMQ(唯一靠谱选择)
  4. 日均消息量 > 1000 万,且需要流式计算 → Kafka
  5. 团队有较强的运维能力,需要多租户隔离 → Pulsar
  6. 公司已经是阿里云用户 → 直接买阿里云 RocketMQ(省心省力)

最后一句经验: MQ 选型没有银弹。选错可以换,但不做高可用一定出事故。先把镜像队列/分区 + ACK + 监控三板斧做到位,至少能覆盖 90% 的故障场景。

你可能感兴趣的文章

来源:每日教程每日一例,深入学习实用技术教程,关注公众号TeachCourse
转载请注明出处: https://teachcourse.cn/4144.html ,谢谢支持!

资源分享

分类:Android 标签:
性能优化实践一 性能优化实践一
ai工具自动发送markdown文章到wordpress站点 ai工具自动发送markdown文章到wo
盘点2018年,那些做过的事情 盘点2018年,那些做过的事情
Junior-Designer工作流 Junior-Designer工作流

评论已关闭!