Apache Kafka 实时流处理实战:从入门到事件驱动架构
如果你以为 Kafka 只是个"消息队列",那你可能浪费了它 80% 的能力。过去两年我用 Kafka 从日志采集一路搭到了事件驱动架构,踩了无数坑。这篇文章带你从零搭一个能用的实时流处理系统,最终落地到完整的事件驱动架构上。
背景:为什么是 Kafka?
当时我接手了一个电商后端的改造项目:订单、库存、通知三个系统之间靠 HTTP 硬耦合,每次大促都因为某个服务抖动导致雪崩。最夸张的一次,订单服务挂了 30 秒,库存和通知跟着全挂,恢复花了 40 分钟。
我们需要一个"缓冲区"——能解耦服务、能扛流量尖峰、还能把数据流给多个消费者复用。Kafka 就是为这个场景设计的。
第一步:本地搭建 Kafka 集群
先别急着上生产,本地跑一遍比看十篇文档都管用。
下载和启动
# 下载 Kafka(内置了 Zookeeper)
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
# 启动 Zookeeper(Kafka 依赖它管理集群元数据)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 Kafka broker(新开终端)
bin/kafka-server-start.sh config/server.properties
注意: Kafka 3.x 开始支持 KRaft 模式(不依赖 Zookeeper),但生产环境 Zookeeper 模式仍然最稳定。新手先用 Zookeeper 模式,别追新。
创建 Topic 并验证
# 创建一个名为 orders 的 topic,3 个分区,1 个副本
bin/kafka-topics.sh --create --topic orders \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1
# 查看 topic 详情
bin/kafka-topics.sh --describe --topic orders \
--bootstrap-server localhost:9092
输出类似这样:
Topic: orders PartitionCount: 3 ReplicationFactor: 1
Topic: orders Partition: 0 Leader: 0 ...
Topic: orders Partition: 1 Leader: 0 ...
Topic: orders Partition: 2 Leader: 0 ...
分区数决定了并行度,这是后面所有性能调优的起点。
第二步:生产者和消费者——第一个坑
理论看再多,不如跑一次 Hello World。我选 Python,上手最快。
安装依赖
pip install kafka-python
生产者代码
from kafka import KafkaProducer
import json, time
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for i in range(10):
msg = {'order_id': f'ORD-{i}', 'amount': 100 + i, 'ts': time.time()}
future = producer.send('orders', value=msg)
result = future.get(timeout=10)
print(f'Sent: {msg}, offset: {result.offset}')
producer.flush()
有个细节:future.get(timeout=10) 会阻塞等 ack。生产环境别每个消息都调它,否则退化成同步发送。应该批量 send 后统一 flush。
消费者代码
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='order-processor',
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
for msg in consumer:
print(f"Partition: {msg.partition}, Offset: {msg.offset}, Value: {msg.value}")
跑起来之后,你会看到消息被分配到不同分区。这就是 Kafka 并行消费的基础。
踩坑记录:消费者组 vs 独立消费者
刚开始我有个误解:以为同一个 group.id 的多个消费者会重复消费消息。实际上恰恰相反——同一个 group 内的消费者分摊分区,每条消息只被消费一次。
如果要多个服务独立消费同一条消息,必须用不同的 group.id。比如订单服务一个组,数据分析另一个组,它们各自独立消费,互不干扰。
第三步:从消息队列到流处理
消息能发能收之后,下一步是"边读边处理"。这就是 Kafka Streams 或 ksqlDB 的舞台。
我当时没直接上 Kafka Streams(Java 太重了),用的 Python 的 faust 库做轻量流处理。
import faust
app = faust.App('payment-flow', broker='kafka://localhost:9092')
class Order(faust.Record):
order_id: str
amount: float
ts: float
orders_topic = app.topic('orders', value_type=Order)
@app.agent(orders_topic)
async def process(orders):
async for order in orders:
if order.amount > 1000:
print(f"High-value order: {order.order_id}")
# 发送到高风险订单处理队列
await send_to_risk_check(order)
else:
print(f"Normal order: {order.order_id}")
if __name__ == '__main__':
app.main()
faust 的写法很像 FastAPI——声明式、异步、类型提示。它帮你处理了 offset 提交、重试、分区分配,你只需要关注业务逻辑。
第四步:关键配置——生产环境调优
这是我最想早知道的配置清单。开发环境怎么跑都行,生产环境一个参数不对就是事故。
Broker 侧
# server.properties
# 数据保留 7 天
log.retention.hours=168
# 单分区日志文件上限,超过就滚动新文件
log.segment.bytes=1073741824
# 不允许自动创建 topic(避免误写导致一堆脏 topic)
auto.create.topics.enable=false
# 集群至少需要 2 个 ISR 副本才接受写入
min.insync.replicas=2
生产者侧
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
acks='all', # 等待所有 ISR 确认
retries=3,
batch_size=16384, # 16KB 凑批
linger_ms=10, # 延迟 10ms 等更多消息
compression_type='gzip', # 压缩减少网络开销
)
acks='all' 是数据不丢的底线。linger_ms 和 batch_size 配合能大幅提升吞吐——代价是增加少量延迟。
消费者侧
consumer = KafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
enable_auto_commit=False, # 手动提交,别信自动
max_poll_records=500,
max_poll_interval_ms=300000
)
为什么不 auto commit? 我亲身经历过:消费者拉了一批消息处理到一半挂掉,自动提交已经把 offset 更新了,重启后这批消息永远丢失。推荐做法是处理完一批再提交,幂等处理保证重试安全。
第五步:落地的关键——幂等和 Exactly-Once
"Kafka 保证消息不丢"是江湖传说,实际上你得自己配。
# 生产者启用幂等
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
enable_idempotence=True, # 自动设置 acks=all + retries=MAX
)
幂等生产者解决了"重试导致重复消息"的问题,但它只保证单分区内不重复。跨分区的事务需要:
# 事务性生产者
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
transactional_id='tx-order-processor'
)
producer.init_transactions()
producer.begin_transaction()
try:
producer.send('orders', value=msg1)
producer.send('payments', value=msg2)
producer.commit_transaction()
except:
producer.abort_transaction()
注意: 事务有性能开销,只在"绝对不能丢/重复"的场景用——比如支付、订单状态变更。普通的日志采集不需要。
第六步:落地事件驱动架构
前面都是铺垫。跑通了生产者和消费者,理解了分区和 offset,就该把 Kafka 放在整个系统架构中考量了。
事件驱动架构的核心思想:服务不直接调用服务,而是通过"事件"通信。订单服务产生"订单已创建"事件,库存服务、通知服务、数据分析服务各自订阅这个事件。
┌─────────────┐
│ 订单 API │
└──────┬──────┘
│ 产生事件
▼
┌─────────────┐
│ Kafka │
│ (事件总线) │
└──┬───┬───┬──┘
│ │ │
┌───────┘ │ └───────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌──────────┐
│ 库存服务 │ │ 通知服务 │ │ 数据分析 │
└────────┘ └────────┘ └──────────┘
这种架构的好处:
- 服务解耦:库存服务挂了不影响下单,消息积压在 Kafka 里,恢复后自动追上
- 流量削峰:秒杀流量先涌入 Kafka,消费者按能力慢慢消费
- 事件溯源:所有业务变更以事件形式保留,谁在什么时候做了什么一目了然
一个真实的例子
我们订单系统重构后,创建订单的流程变成了:
- 订单服务接收请求,写入数据库
- 产生
OrderCreated事件到 Kafka - 库存服务消费事件,扣减库存(异步)
- 风控服务消费事件,做风险评分(异步)
- 通知服务消费事件,发送确认短信(异步)
踩坑: 一开始我们把所有逻辑放在一个 consumer 里顺序执行,结果库存扣减慢了,通知也发不出去。后来拆成 3 个独立的消费者组,各自独立消费,问题解决。
第七步:监控——没有监控的 Kafka 是盲人开车
Kafka 不像 MySQL 那样"配好就不用管"。你必须盯紧这些指标:
# 消费者组堆积情况
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --describe
# 输出中的 LAG 列代表堆积量
# TOPIC CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 1500 3500 2000
LAG 是核心指标。我设了 Prometheus 告警:LAG > 10000 持续 5 分钟就发钉钉报警。
# prometheus 告警规则
groups:
- name: kafka
rules:
- alert: KafkaHighLag
expr: kafka_consumer_lag > 10000
for: 5m
labels:
severity: warning
还有一个容易被忽略的点:消费者处理耗时。如果单个消息处理超过 5 分钟,Kafka 会认为消费者死了,触发 rebalance。这时候你最好把 max_poll_interval_ms 设大一点,或者缩短单条消息处理时间。
总结:Kafka 的上手路径
- 从本地单机开始,跑通生产消费
- 理解分区和消费者组——这是 Kafka 并行和容错的根基
- 配置调优重点在 acks、retries、幂等性
- 从消息队列平滑过渡到流处理,用 faust/ksqlDB 逐步迭代
- 最终目标是事件驱动架构,让 Kafka 成为系统的"数据主动脉"
延伸思考
- Kafka vs Pulsar:Pulsar 的存算分离架构在云原生场景更有优势,但社区成熟度不如 Kafka。小团队选 Kafka 不会错。
- Schema Registry:消息多了之后,生产者和消费者对消息格式的理解会不一致。用 Avro + Schema Registry 做契约测试,团队规模 5 人以上时的必选项。
- Kafka Connect:如果要从 MySQL 同步数据到 Kafka,别自己写 JDBC 代码,Kafka Connect 的 Debezium 插件开箱即用,还能捕获 binlog 变更。
\n

评论已关闭!