-
阻塞式重试;
-
2PC、3PC 传统事务;
-
使用队列,后台异步处理;
-
TCC 补偿事务;
-
本地消息表(异步确保);
-
MQ 事务。
阻塞式重试
m := db.Insert(sql)
err := request(B-Service,m)
func request(url string,body interface{}){
for i:=0; i<3; i ++ {
result, err = request.POST(url,body)
if err == nil {
break
}else {
log.Print()
}
}
}
-
调用 B 服务成功,但由于网络超时原因,当前服务认为其失败了,继续重试,这样 B 服务会产生 2 条一样的数据。
-
调用 B 服务失败,由于 B 服务不可用,重试 3 次依然失败,当前服务在前面代码中插入到 DB 的一条记录,就变成了脏数据。
-
重试会增加上游对本次调用的延迟,如果下游负载较大,重试会放大下游服务的压力。
异步队列
m := db.Insert(sql)
err := mq.Publish("B-Service-topic",m)
TCC 补偿事务
-
阶段一、Try 操作:对业务资源做检测、资源预留,比如对库存的检查、预扣。
-
阶段二、Confirm 操作:提交确认 Try 操作的资源预留。比如把库存预扣更新为扣除。
-
阶段二、Cancel 操作:Try 操作失败后,释放其预扣的资源。比如把库存预扣的加回去。
m := db.Insert(sql)
aResult, aErr := A.Try(m)
bResult, bErr := B.Try(m)
cResult, cErr := C.Try(m)
if cErr != nil {
A.Cancel()
B.Cancel()
C.Cancel()
} else {
A.Confirm()
B.Confirm()
C.Confirm()
}
空释放
时序
调用失败
-
阻塞式重试。但有同样的问题,比如宕机、一直失败的情况。
-
写入日志、队列,然后有单独的异步服务自动或人工介入处理。但一样会有问题,写日志或队列时,会存在失败的情况。
本地消息表
配合MQ
messageTx := tc.NewTransaction("order")
messageTxSql := tx.TryPlan("content")
m,err := db.InsertTx(sql,messageTxSql)
if err!=nil {
return err
}
aErr := mq.Publish("B-Service-topic",m)
if aErr!=nil { // 推送到 MQ 失败
messageTx.Confirm() // 更新消息的状态为 confirm
}else {
messageTx.Cancel() // 删除消息
}
// 异步处理 confirm 的消息,继续推送
func OnMessage(task *Task){
err := mq.Publish("B-Service-topic", task.Value())
if err==nil {
messageTx.Cancel()
}
}
上面代码中其 messageTxSql 是插入本地消息表的一段 SQL :
insert into `tcc_async_task` (`uid`,`name`,`value`,`status`) values ('?','?','?','?')
confirm
。本地消息表中
status
有 2 种状态
try
、
confirm
, 无论哪种状态在
OnMessage
都可以监听到,从而发起重试。
配合服务调用
messageTx := tc.NewTransaction("order")
messageTxSql := tx.TryPlan("content")
body,err := db.InsertTx(sql,messageTxSql)
if err!=nil {
return err
}
aErr := request.POST("B-Service",body)
if aErr!=nil { // 调用 B-Service 失败
messageTx.Confirm() // 更新消息的状态为 confirm
}else {
messageTx.Cancel() // 删除消息
}
// 异步处理 confirm 或 try 的消息,继续调用 B-Service
func OnMessage(task *Task){
// request.POST("B-Service",body)
}
OnMessage
中处理:
messageTx := tc.NewTransaction("order")
messageTx := tx.Try("content")
aErr := request.POST("B-Service",body)
// ....
消息过期
Try
和
Confirm
消息的处理器:
TCC.SetTryHandler(OnTryMessage())
TCC.SetConfirmHandler(OnConfirmMessage())
func OnConfirmMessage(task *tcc.Task) {
if time.Now().Sub(task.CreatedAt) > time.Hour {
err := task.Cancel() // 删除该消息,停止重试。
// doSomeThing() 告警,人工介入
return
}
}
Try
处理函数中,还要单独判断当前消息任务是否存在过短,因为
Try
状态的消息,可能才刚刚创建,还没被确认提交或删除。这会和正常业务逻辑的执行重复,意味着成功的调用,也会被重试;为尽量避免这种情况,可以检测消息的创建时间是否很短,短的话可以跳过。
独立消息服务
err := request.POST("Message-Service",body)
if err!=nil {
return err
}
aErr := request.POST("B-Service",body)
if aErr!=nil {
return aErr
}
try
和
confirm
,消息服务在前面多了一种状态
prepare
。
MQ 事务
Confirm
确认提交消息,失败则
Cancel
删除消息。MQ 事务也会存在
prepare
状态,需要 MQ 的消费处理逻辑来确认业务是否成功。
总结
推荐阅读
1. GitHub 上有什么好玩的项目?
2. Linux 运维必备 150 个命令汇总
3. SpringSecurity + JWT 实现单点登录
本文分享自微信公众号 - Java后端(web_resource)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
相关文章
暂无评论...