Dapr 状态管理与发布订阅实战:从踩坑到真香
结论先行:Dapr 的状态管理和 Pub/Sub 能帮你省掉 80% 的样板代码,但如果你不懂它的原子性约束和 Sidecar 生命周期,上线第一个小时就会翻车。下面是我用 Go + Redis 实战后总结的完整步骤和踩坑记录。
一、准备工作
我用的是 Dapr v1.12,应用基于 Go 1.21,状态存储用 Redis,Pub/Sub 用 RabbitMQ。先保证本地环境:
dapr init --runtime-version 1.12
# 确认 sidecar 在运行
dapr --version
项目结构:
order-service/
├── components/ # Dapr 组件配置
│ ├── statestore.yaml
│ └── pubsub.yaml
├── cmd/
│ └── main.go
└── go.mod
二、状态管理实战
2.1 组件配置(踩坑起点)
我的第一个坑:组件配置文件必须放在默认路径(~/.dapr/components)或者通过 --resources-path 指定。否则 sidecar 启动后根本找不到状态存储。我当时少写了一个资源路径参数,折腾了半小时才反应过来。
最终 statestore.yaml 内容:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: orderstore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: keyPrefix
value: "order:"
注意:
keyPrefix加前缀是为了多环境隔离,但会直接拼接在 Redis key 前面,比如"order:order-123"。
2.2 保存和读取状态(Go 代码)
我用 Dapr Go SDK 调用状态 API。核心逻辑是:订单创建时保存,查询时读取。这里用 Gin 框架作为 HTTP 路由,结合 Dapr 客户端,代码很简洁。
package main
import (
"context"
"encoding/json"
dapr "github.com/dapr/go-sdk/client"
"net/http"
"github.com/gin-gonic/gin"
)
type Order struct {
ID string `json:"id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
}
func main() {

client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
r := gin.Default()
r.POST("/order", func(c *gin.Context) {
var order Order
c.BindJSON(&order)
data, _ := json.Marshal(order)
// 使用 Dapr 状态 API 保存
if err := client.SaveState(context.Background(), "orderstore", order.ID, data); err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, order)
})
r.GET("/order/:id", func(c *gin.Context) {
id := c.Param("id")
item, err := client.GetState(context.Background(), "orderstore", id)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
if item.Value == nil {
c.JSON(404, gin.H{"error": "not found"})
return
}
var order Order
json.Unmarshal(item.Value, &order)
c.JSON(200, order)
})
r.Run(":8080")
}
启动命令(重点):
# 先启动 Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine
# 启动带 Dapr sidecar 的应用
dapr run --app-id order --app-port 8080 --dapr-http-port 3500 go run main.go
2.3 踩坑:并发更新丢失
当两个请求同时更新同一个订单状态时,Dapr 默认使用 Last-Write-Wins,后写入的会覆盖先写入的。我写了个压力测试,发现数据不一致——订单状态明明已经从“待支付”变成了“已支付”,又被另一个请求覆盖回了“待支付”。丢了数据心里慌。
解决方案:使用 ETag(乐观锁)。先读取订单获取当前版本号,更新时带上这个版本号,如果版本号不匹配就说明有并发冲突,需要重试或报错。
item, _ := client.GetState(context.Background(), "orderstore", "order-1")
etag := item.Etag // 取到当前版本号
order.Status = "paid"
data, _ := json.Marshal(order)
err := client.SaveState(context.Background(), "orderstore", "order-1", data, dapr.WithStateETag(etag))
if err != nil {
// 如果 ETag 已变,err 是 ErrStateNotFound 或 ErrStateConflict
// 需要重试或报错
}
注意:Redis 版 Dapr 状态存储的 ETag 是 Redis key 的 version(整数),更新时如果 ETag 不匹配会返回错误。
三、发布订阅实战
3.1 组件配置(另一个坑)
我选了 RabbitMQ 作为消息队列。配置文件 pubsub.yaml:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: orderpubsub
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: connectionString
value: "amqp://guest:guest@localhost:5672"
- name: exchangeName
value: "dapr-exchange"
- name: durable
value: "true"
这里我踩的坑是:exchangeName 必须提前声明。Dapr 默认不会自动创建 exchange,如果 RabbitMQ 里没有对应 exchange,pub/sub 会静默失败。我开始没手动建 exchange,结果事件发布出去,消费者收不到,查了半天日志才发现。我在 RabbitMQ 管理页面手动创建了 dapr-exchange 类型为 topic 后才正常。
3.2 发布事件(生产者)
在订单创建成功后,发布 "ORDER_CREATED" 事件:
r.POST("/order", func(c *gin.Context) {
// ... 保存状态
// 发布事件
event := map[string]interface{}{
"orderId": order.ID,
"amount": order.Amount,
}
if err := client.PublishEvent(context.Background(), "orderpubsub", "ORDER_CREATED", event); err != nil {
// 记录日志,但不要影响主流程(最终一致性)
log.Printf("publish event failed: %v", err)
}
})
3.3 订阅事件(消费者)
我用了 Dapr 的 声明式订阅:在组件目录添加 subscription.yaml,避免硬编码路由。
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: order-sub
spec:
pubsubname: orderpubsub
topic: ORDER_CREATED
route: /order-event
然后在应用中处理这个路由:
r.POST("/order-event", func(c *gin.Context) {
var event map[string]interface{}
c.BindJSON(&event)
// 模拟处理:发送邮件等
orderID := event["orderId"].(string)
log.Printf("Received event for order: %s", orderID)
c.JSON(200, gin.H{"status": "ok"})
})
启动两个服务:一个是主服务(order),一个是订阅者服务(notification)。分别用 dapr run 启动。
3.4 踩坑:订阅者必须返回 2xx
Dapr sidecar 会在收到订阅者返回非 2xx 状态码时自动重试(指数退避)。如果我的处理函数中有业务错误,乱返回 500,会导致消息积压和重复消费。正确做法:业务错误也返回 200,内部记录日志,然后依靠补偿机制。比如发货失败,可以记录异常并触发人工介入,而不是让 sidecar 反复重试。
四、实战总结
4.1 状态管理关键点
- 默认原子性只针对单条记录,跨记录事务需要支持事务的状态存储(如 PostgreSQL、MongoDB)
- ETag 乐观锁是保证一致性的唯一简单方式,但要注意重试逻辑
- 组件配置中
keyPrefix会影响 Redis 哈希槽分布,尽量避免超长前缀
4.2 发布订阅关键点
- 声明式订阅比编程式更灵活,且支持通过 Dapr Dashboard 查看
- RabbitMQ exchange 需要手动创建,Dapr 不会自动生成
- 消费端幂等设计是必须的,因为 sidecar 可能重试
4.3 延伸思考
- 如何把状态管理和 Pub/Sub 结合实现 Saga 模式?比如订单创建后发事件,库存服务扣减后更新状态。
- 尝试用 Dapr Workflow 替代手工编排状态,它内置了重试和补偿。
- 如果要上 Kubernetes,改用 Dapr Operator 管理组件,避免本地路径问题。
最后,Dapr 不是一个黑盒,但你没必要重造轮子。学会“先信任,再验证”的思路,用单元测试和混沌工程来保证 Sidecar 的行为符合预期——这才是微服务成熟的标志。

评论已关闭!