消息服务中如何确保消息至少被消费一次
对消息服务需要了解的朋友,可以移步:
创新互联公司是一家业务范围包括IDC托管业务,雅安服务器托管、主机租用、主机托管,四川、重庆、广东电信服务器租用,绵阳服务器托管,成都网通服务器托管,成都服务器租用,业务范围遍及中国大陆、港澳台以及欧美等多个国家及地区的互联网数据服务公司。
- 聊聊mq的使用场景
- 聊聊业务系统中投递消息到mq的几种方式
- 谈谈mq消息消费的几种方式
本章讨论主题
- 如何确保消息至少消费一次,确保消费者最大程度消费成功
消费者消费消息有2中方式:
1. push方式
消息服务接收到消息之后,主动将消息推送给消费者消费
2. pull方式
消费者定时从消息服务中拉取消息进行消费
下面我们将讨论2中方式中如何确保消息至少被消费一次。
push模式
消费的过程:
- 消息服务查询待消费的消息列表
- 轮询待消息列表
- 调用消费者
- 消费者收到消费请求,执行业务处理,将处理结果返回给消息服务
- 消息服务接收到消费成功的信息,将消息状态置为消费成功状态
- 继续消费下一条消息
探讨一下上面需要考虑的问题:
若消息一直消费失败如何处理?
先说一下影响:
消息被阻塞
消息如果一直消费失败,消息服务会不断调用消费者进行消费,会阻塞其他消息的消费,直接影响到业务的正常进行.
消费失败的原因:
代码问题
这种情况不管尝试多少次,消息都会消费失败,需要人工介入修复bug,这个可以依靠监控系统发现bug,同时开发进行修复。
系统运行异常
如调用超时、网络问题等一些不可控的因素。产生这种错误,继续重试,最终会处理成功。
此处咱们只用讨论消息服务中重试机制如何设计?
系统异常情况下,可能过一段时间,系统恢复了,此时去重试,消费也就成功了。
所以我们对于消费失败的消息采用延迟处理的方式,可以这么实现:
消息中增加几个字段用于重试:next_dispose_time【下次处理时间】、max_failure【最大允许失败次数】、failure【当前失败次数】,消息入库时:next_dispose_time=需消费的时间,max_failure = 运行最大失败次数, failure=0;
当消费失败时,处理过程:
计算下次处理时间(next_dispose_time),可以在当前时间上面做指数递增,比如根据失败次数依次在当前时间上递增2的failure次方秒,如:
第1次失败:当前时间 + 2秒
第2次失败:当前时间 + 4秒
第3次失败:当前时间 + 8秒
第4次失败:当前时间 + 16秒
.......
第n次失败:当前时间 + 2的n次方秒
- failure++
消息服务查询待消费的消息也需要做调整:
select * from 消息表where next_dispose_time<=当前时间 and failure 此时能够最大程度保证消息最少消费成功一次。 这种会复杂一些,为何会复杂一些,咱们先看一下常规的流程: 如果本地一直处理失败,那么后面拉取到的都是同一条消息,这条消息直接阻塞后续消息的消费,这种情况如何解? 咱们先分析一下出现这种问题的后果及原因: 遇到这种问题还是挺严重了,业务方都是无法接受的,一条消息消费失败,会影响到其他所有消息的消费,这个我们还是得想办法解决,可以这样: 消息先落地,然后异步处理,本地需要有个补偿的job,去处理本地消费失败的消息,这个可以参考push方式消费的过程。pull方式
网页题目:消息服务中如何确保消息至少被消费一次
分享链接:http://myzitong.com/article/igipih.html