消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。
简单的消息去重解决方案
insert
into t_order
values .....
update t_inv
set
count =
count
-1
where good_id =
'good123';
select * from t_order where order_no = 'order123'
if(order != null) {
return ;//消息重复,直接返回
}
并发重复消息
select * from t_order where order_no = 'order123'
if(order != null) {
return ;//消息重复,直接返回
}
select *
from t_order
where order_no =
'THIS_ORDER_NO'
for update
//开启事务
if(order.status !=
null) {
return ;
//消息重复,直接返回
}
Exactly Once
Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
基于关系数据库事务插入消息表
update t_order
set
status =
'SUCCESS'
where order_no=
'order123';
1.开启事务
2.插入消息表(处理好主键冲突的问题)
3.更新订单表(原消费逻辑)
4.提交事务
https://help.aliyun.com/document_detail/102777.html
更复杂的业务场景
-
检查库存(RPC)
-
锁库存(RPC)
-
开启事务,插入订单表(MySQL)
-
调用某些其他下游服务(RPC)
-
更新订单状态
-
commit 事务(MySQL)
-
库存系统消费A:检查库存并做锁库存,发送消息B给订单服务
-
订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费
-
下游系统消费消息C:处理部分逻辑,发送消息D给订单系统
-
订单系统消费消息D:更新订单状态
更通用的解决方案
-
问题一:消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
-
问题二:并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
-
问题三:支持上游业务生产者重发的业务重复的消息幂等问题。
-
1.性能上损耗更低
-
2.上面我们讲到的超时时间可以直接利用Redis本身的ttl实现
show me code
https://github.com/Jaskey/RocketMQDedupListener ,
//利用Redis做幂等表
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer(
"TEST-APP1");
consumer.subscribe(
"TEST-TOPIC",
"*");
String appName = consumer.getConsumerGroup();
// 大部分情况下可直接使用consumer group名
StringRedisTemplate stringRedisTemplate =
null;
// 这里省略获取StringRedisTemplate的过程
DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener =
new SampleListener(dedupConfig);
consumer.registerMessageListener(messageListener);
consumer.start();
这种实现是否一劳永逸?
-
步骤1:检查库存(RPC)
-
步骤2:锁库存(RPC)
-
步骤3:开启事务,插入订单表(MySQL)
-
步骤4:调用某些其他下游服务(RPC)
-
步骤5:更新订单状态
-
步骤6:commit 事务(MySQL)
本实现方式的价值?
-
1.各种由于Broker、负载均衡等原因导致的消息重投递的重复问题
-
2.各种上游生产者导致的业务级别消息重复问题
-
3.重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑
一些其他的消息去重的建议
-
#1.消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。
-
#2.消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。
-
#3.一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)
投 稿 通 道
#
让你的博客被更多人看到
如果你在 CSDN、博客园、掘金等平台有写技术博客的习惯,想让自己的原创博客被更多人看到,可以来 Java后端 投稿。
???? 稿件基本要求:
• 文章确系个人原创作品,如果在其他非公众号渠道有过发表也可以,只要是个人原创即可。
• 稿件建议以 markdown 格式撰写,文中配图以附件形式发送,要求图片清晰、语句通顺。
• 如果被采纳的原创稿件,我们将提供稿费以及个人影响力曝光,具体依据文章阅读量和质量结算稿费。
???? 投稿通道:
• 投稿请联系下方微信,备注:原创投稿
△长按添加 Java后端 小编
本文分享自微信公众号 - Java后端(web_resource)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
相关文章
暂无评论...