消息队列选型与高可用架构实战
做 MQ 选型,别纠结,先看业务场景再定技术:业务量日均百万级选 RabbitMQ,千万级选 RocketMQ,亿级且需要流式处理选 Kafka,要云原生且多租户选 Pulsar。
过去两年我主导了三个项目的 MQ 重构,每个踩过的坑都够写一篇检讨。这篇文章不聊空洞的概念,只讲实战。
一、先想清楚一个问题:你真的需要消息队列吗?
很多团队上 MQ 是因为"别人都在用"。我见过最离谱的项目,日均几百条消息,硬上了 Kafka,运维成本比业务代码还高。
什么场景必须上 MQ?
- 异步解耦:用户注册后发邮件,不需要等邮件发送成功再返回
- 流量削峰:秒杀系统瞬间流量上万 QPS,数据库扛不住
- 数据同步:订单系统 → 仓储系统 → 财务系统,多系统靠消息驱动
- 日志采集:业务日志需要统一采集、投递到大数据平台
日均 QPS 不到 1000 的团队,直接用 Redis List 或者数据库轮询就行,别给运维添负担。
二、主流 MQ 横向对比
先给一张我自用的选型对照表:
| 维度 | RabbitMQ | Apache Kafka | RocketMQ | Apache Pulsar |
|---|---|---|---|---|
| 语言 | Erlang | Scala/Java | Java | Java |
| 吞吐量 | 万级/s | 百万级/s | 十万级/s | 百万级/s |
| 延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒级 |
| 消息顺序 | 单队列有序 | 分区内有序 | 分区内有序 | 分区内有序 |
| 消息堆积 | 较弱 | 强 | 强 | 最强 |
| 运维复杂度 | 中 | 高 | 中 | 中高 |
| 社区活跃度 | 高 | 最高 | 中(国内活跃) | 中 |
直接给结论:
- 中小团队/项目早期 → RabbitMQ。文档全,社区大,遇到问题 Stack Overflow 一搜就有答案
- 阿里系/Java 技术栈/需要事务消息 → RocketMQ。RocketMQ 的事务消息方案比 Kafka 的 exactly-once 成熟得多
- 大数据管道/日志采集/流式计算 → Kafka。生态无敌,Flink、Spark 都是亲儿子
- 多租户/云原生/存算分离 → Pulsar。架构最先进,但团队需要有一定技术储备
三、RabbitMQ 高可用部署实战
我第一次在生产用 RabbitMQ 就吃亏了——单节点部署,半夜挂了,全公司订单处理停了 40 分钟。
3.1 镜像队列模式(必配)
# 创建镜像队列策略,自动同步到所有节点
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
注意:
ha-mode: all会把队列复制到所有节点,节点多时建议用ha-mode: exactly+ha-params: 2,只复制 2 份,节省资源。
3.2 生产配置模板
这套配置我线上用了两年没出过问题:
# /etc/rabbitmq/rabbitmq.conf
listeners.tcp.default = 5672
# 集群节点
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_consul
cluster_formation.consul.host = consul.service.consul
cluster_formation.consul.svc = rabbitmq
# 内存阈值——实际踩坑:默认 0.4 太低,大流量时频繁触发流控
vm_memory_high_watermark.relative = 0.7
# 磁盘阈值
disk_free_limit.absolute = 2GB
# 心跳超时,避免网络抖动导致频繁重连
heartbeat = 60
3.3 客户端连接必须做的事
import pika
import time
from retry import retry
class RabbitMQClient:
def __init__(self):
self.hosts = ['node1:5672', 'node2:5672', 'node3:5672']
self.connection = None
@retry(tries=3, delay=1, backoff=2)
def connect(self):
# 关键:使用心跳和阻塞连接超时
params = pika.ConnectionParameters(
host=self.hosts[0],
heartbeat=60,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=1
)
self.connection = pika.BlockingConnection(params)
def publish_with_confirm(self, exchange, routing_key, message):
# 启用发布者确认——这是防丢消息的最后防线
channel = self.connection.channel()
channel.confirm_delivery()
try:
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
),
mandatory=True
)
return True
except pika.exceptions.UnroutableError:
# 消息没有路由到任何队列
self._handle_failed_message(message)
return False
这个 publish_with_confirm 是我踩坑后加上去的。早期代码直接 fire-and-forget,结果 RabbitMQ 节点挂了,消息丢了完全不知道。
四、Kafka 高可用架构踩坑记
公司之前用 Kafka 做埋点数据管道,日均 1.2 亿条消息。我接手时遇到三个致命问题。
4.1 坑一:分区数不是越多越好
前任把每个 Topic 设了 64 个分区,理由是"分区越多吞吐越大"。实际结果:消费者重平衡一次要 3 分钟,期间消费完全停摆。
真相:分区数 = 消费者的最大并行度。你只有 6 个消费者实例,设 64 个分区纯属浪费。
经验公式:
分区数 = max(消费者实例数 × 2, 预期的峰值吞吐量 / 单个分区吞吐量)
4.2 坑二:acks=1 丢数据不奇怪
# 生产配置——重要程度从高到低
acks=all # 等所有 ISR 副本都确认
min.insync.replicas=2 # 最小 ISR 副本数
replication.factor=3 # 副本因子
unclean.leader.election.enable=false # 不允许非 ISR 副本成为 leader
retries=Integer.MAX_VALUE # 无限重试
max.in.flight.requests.per.connection=1 # 保证顺序
acks=all 配合 min.insync.replicas=2 是关键组合。只配 acks=all 但 ISR 只有一个副本时,跟 acks=1 没区别。
4.3 坑三:消费端不处理异常导致 OOM
// 正确的消费姿势
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
process(record);
} catch (Exception e) {
// 记录到死信队列,而不是一直重试
sendToDLQ(record, e);
// 关键:继续消费下一条,不要阻塞
}
}
// 处理完一批再提交 offset,不要每条都 commit
consumer.commitSync();
}
我见过最离谱的代码:catch 块里什么都不做,直接 continue,消息消费失败就静默丢弃。排查业务数据对不上找了三天才发现是这个原因。
五、RocketMQ 事务消息实战
RocketMQ 的事务消息是我用过最靠谱的分布式事务方案,没有之一。
5.1 核心流程
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 扣减库存 + 创建订单(同一个本地事务)
orderService.createOrder((Long) arg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 事务回查——RocketMQ 核心优势
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务是否成功
String orderId = msg.getKeys();
if (orderService.checkOrderExists(orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
});
// 发送半消息
SendResult result = producer.sendMessageInTransaction(msg, orderId);
核心思想:先发半消息(Broker 存储但不可见),执行本地事务成功后再 Commit。如果 Commit 失败,Broker 会轮询回查本地事务状态。
Kafka 也有事务消息,但我实战下来 RocketMQ 的回查机制远比他成熟。
5.2 事务消息使用限制
注意: RocketMQ 事务消息不支持定时消息和批量消息。Broker 配置
transactionTimeOut默认 6 秒,可以根据业务调整。回查次数默认 15 次,超过会标记为未知状态,需要人工介入。
六、消息防丢的四重保障
在三个项目里验证过的方案,从轻到重:
第一层:生产者确认
- RabbitMQ:Publisher Confirm
- Kafka:acks=all
- RocketMQ:同步发送 + SendResult.getSendStatus()
第二层:消息持久化
- 所有 MQ 都支持持久化,但默认配置不一定开
- 注意:持久化不是写磁盘就完事,要确认刷盘策略
第三层:消费端手动 ACK
def callback(ch, method, properties, body):
try:
process(body)
# 业务处理成功才 ACK
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
# 处理失败:nack 并重新入队
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
第四层:本地消息表(终极方案)
对核心业务(订单、支付),不要依赖 MQ 本身的可靠性。
业务操作 → 本地事务写业务表 + 消息表(同库)
→ 独立定时任务扫描消息表发送到 MQ
→ MQ 消费成功后回调更新消息状态
→ 消费失败不停重试,直到人工介入
这套方案看起来笨重,但能扛得住任何 MQ 故障。我们支付链路就是靠它撑过了两次 Kafka 集群崩溃。
七、监控与运维实战
别等出了事再查日志,日常监控要到位。
7.1 必须盯的核心指标
生产者:
- 发送耗时 P99 > 500ms 报警
- 发送失败率 > 0.1% 报警
消费者:
- 消费延迟(lag)> 阈值报警
- 消费失败率 > 1% 报警
集群:
- 磁盘使用率 > 80% 报警(Kafka 对磁盘空间极其敏感)
- CPU load > 核心数 × 0.7 报警
- 网络带宽使用率 > 70% 报警
7.2 Kafka 监控一条命令搞定
# 查看消费组延迟
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group my-consumer-group --describe
重点关注 LAG 列。如果某个分区的 LAG 持续增长,说明消费者处理不过来。
7.3 磁盘故障实战一例
有次 Kafka 节点磁盘满了,整个分区不可用。当时监控只报了磁盘使用率,但没有预警增长速度——结果凌晨流量高峰时磁盘写爆,影响了核心业务 20 分钟。
后来加了个更早触发的预警:
# 每小时检查磁盘增长速率
# 如果过去1小时增长超过100GB,提前预警
watch -n 3600 'df -h /data/kafka | tail -1 | awk "{print \$5}"'
八、选型决策树
最后给个决策辅助,一条路走到底:
- 日均消息量 < 50 万 → 直接 RabbitMQ,别犹豫
- 日均消息量 50 万 - 1000 万,Java 技术栈 → RocketMQ
- 需要事务消息 → RocketMQ(唯一靠谱选择)
- 日均消息量 > 1000 万,且需要流式计算 → Kafka
- 团队有较强的运维能力,需要多租户隔离 → Pulsar
- 公司已经是阿里云用户 → 直接买阿里云 RocketMQ(省心省力)
最后一句经验: MQ 选型没有银弹。选错可以换,但不做高可用一定出事故。先把镜像队列/分区 + ACK + 监控三板斧做到位,至少能覆盖 90% 的故障场景。

评论已关闭!