如何基于RocketMQ的事务消息特性实现分布式系统的最终一致性?
前言
在这篇文章中我们将介绍RocketMQ的事务消息相关的内容,并通过一些实践和大家一起来探索下事务消息如何解决分布式系统中的分布式事务问题。
阜新ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18980820575(备注:SSL证书合作)期待与您的合作!
事务消息原理
事务消息特性可以看作是两阶段协议的消息实现方式,用以确保在以消息中间件解耦的分布式系统中本地事务的执行和消息的发送,可以以原子的方式进行。
举个例子,以某互联网公司的用户余额充值为例,因为有充返活动(充值100元赠送20元),优惠比较大,用户Joe禁不住诱惑用支付宝向自己的余额账户充值了100元,支付成功后Joe的余额账户有了120元钱。
而该公司的关于用户余额充值的系统设计是这样的:
在这个设计流程中,该公司通过自建支付系统完成用户Joe的支付宝扣款操作,成功后需要更新支付流水的状态,因为用户的余额账户系统与支付系统之间通过MQ解耦了,所以支付系统在完成支付流水状态更新后需要通过发送MQ消息到消息中间件服务,然后用户余额系统作为消费者通过消息消费的方式完成用户余额的增加操作。
这里有个问题:“支付系统如何确保这笔余额充值消息一定会成功发送到MQ,并且用户余额系统一定能处理成功呢”?如果支付系统在完成支付订单状态更新后,MQ消息发送失败或者用户余额系统消息处理失败的话,都会导致Joe支付扣款成功,而自己的余额账户却没到账的情况发生。
为了解决这个问题,按照目前的系统设计是需要“支付系统-MQ服务-用户余额系统”三者的处理满足数据的一致性要求。例如,如果支付系统感知到消息发送失败后还可以进行重新投递,从而确保支付系统与用户余额数据的最终一致性。
而上述问题就是事务消息要解决的问题,在具体了解RocketMQ提供的事务消息机制之前,我们先来看下在RocketMQ的早期版本不支持事务消息,或者因为历史原因选择的消息中间件本身就不支持事务消息的情况下,一些大公司是怎么解决这个问题的?
早期为了实现基于MQ异步调用的多个服务间,业务逻辑执行要么一起成功、要么一起失败,具备事务特点,通常会采用可靠消息最终一致性方案,来实现分布式事务。还是以Joe充值这件事来举例,可靠消息方案实现过程如下:
在可靠消息最终一致性方案中,为了实现分布式事务,需要确保上游服务本地事务的处理与MQ消息的投递具有原子性,也就是说上游服务本地事务处理成功后要确保消息一定要成功投递到MQ服务,否则消息就不应该被投递到MQ服务;同样,被成功投递到MQ服务的消息,也一定要被下游服务成功处理,否则就需要重新投递MQ消息。
为了实现双向的原子性,可靠消息服务需要对消息进行状态标记,与此同时还需要对消息进行状态检查,从而实现重新投递及消息状态的最终一致性。核心流程说明如下:
1、上游服务(支付系统)如何确保完成自身支付成功状态更新后消息100%的能够投递到下游服务(用户余额系统)指定的Topic中?
在这个流程中上游服务在进行本地数据库事务操作前,会先发送一个状态为“待确认”的消息至可靠消息服务,而不是直接将消息投递到MQ服务的指定Topic。可靠消息服务此时会将该消息记录到自身服务的消息数据库中(消息状态为->待确认),完成后可靠消息服务会回调上游服务表示收到了消息,你们可以进行本地事务的操作了。
之后上游服务就会开启本地数据库事务执行业务逻辑操作,这里支付系统就会将该笔支付订单状态更新为“已成功”。(注意,这里只是举个示例场景,在真正的实践中一般是不会把支付订单本身的状态与业务端回调放在一个事务流程中的,关于这部分的详细说明我们在下面的场景说明中再讨论)。
如果上游服务本地数据库事务执行成功,则继续向可靠消息服务发送消息确认消息,此时可靠消息服务就会正式将消息投递到MQ服务,并且同时更新消息数据库中的消息状态为“已发送”。(注意,这里可靠消息服务更新消息状态与投递消息至MQ也必须是在一个原子操作中,即消息投递成功则一定要将消息状态更新为“已发送”,所以在编程的细节中,可靠消息服务一般会先更新消息状态,然后再进行消息投递,这样即使消息投递失败,也可以对消息状态进行回滚->“待确认”,相反如果先进行消息投递再更新消息状态,可能就不好控制了)。
相反,如果上游本地数据库事务执行失败,则需要向可靠消息服务发送消息删除消息,可靠消息服务此时就会将消息删除,这样就意味着事务在上游消息投递过程中就被回滚了,而流程也就此结束了,此时上游服务可以需要通过业务逻辑的设计进行重发,这个就不再分布式事务的讨论范畴了。
说到这里,大家可能会有疑问了!因为在上述描述中,即使上游服务本地数据库事务执行成功了,但是在发送确认消息至可靠消息服务的过程中,以及可靠消息服务在投递消息至MQ服务的过程中,还是会存在失败的风险,这样的话还是会导致支付服务更新了状态,但是用户余额系统连消息都没有收到的情况发生?
实际上,实现数据一致性是一个复杂的活。在这个方案中可靠消息服务作为基础性的服务除了执行正常的逻辑外,还得处理复杂的异常场景。在实现过程中可靠消息服务需要启动相应的后台线程,不断轮训消息的状态,这里会轮训消息状态为“待确认”的消息,并判断该消息的状态的持续时间是否超过了规定的时间,如果超过规定时间的消息还处于“待确认”的状态,就会触发上游服务状态询问机制。
可靠消息服务就会调用上游服务提供的相关借口,询问这笔消息的处理情况,如果这笔消息在上游服务处理成功,则后台线程就会继续触发上图中的步骤5,更新消息状态为“已发送”并投递消息至MQ服务;反之如果这笔消息上游服务处理失败,可靠消息服务则会进行消息删除。通过这样以上机制就确保了“上游服务本地事务成功处理+消息成功投递”处于一个原子操作了。
2、下游服务(用户余额系统)如何确保对MQ服务Topic消息的消费100%都能处理成功?
在1的过程中,确保了上游服务逻辑处理与MQ消息的投递具备原子性,那么当消息被成功投递到了MQ服务的指定Topic后,下游服务如何才能确保消息的消费一定能被成功处理呢?
在正常的流程中,下游服务等待消费Topic的消息并进行自身本地数据库事务的处理,如果处理成功则会主动通知可靠消息服务,可靠消息服务此时就会将消息的状态更新为“已完成”;反之,处理失败下游服务就无法再主动向可靠消息服务发送通知消息了。
此时,与消息投递过程中的异常逻辑一样,可靠消息服务也会启动相应的后台线程,轮询一直处于“已发送”状态的消息,判断状态持续时间是否超过了规定时间,如果超时,可靠消息服务就会再次向MQ服务投递此消息,从而确保消息能被再次消费处理。(注意,也可能出现下游服务处理成功,但是通知消息发送失败的情况,所以为了确保幂等,下游服务也需要在业务逻辑上做好相应的防重处理)。
RocketMQ事务消息机制
在
文章名称:如何基于RocketMQ的事务消息特性实现分布式系统的最终一致性?
文章网址:http://myzitong.com/article/ggegeo.html