跟我学RocketMQ之事务消息落地案例
本文是对RocketMQ事务消息的综合讲解,提供一个较为典型的落地案例供读者进行参考。
什么是RocketMQ事务消息
以下内容引用自RocketMQ开发者中心,http://rocketmq.cloud
Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。
事务消息流程
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
- 发送事务半消息(half消息)。
- 服务端响应消息写入结果。
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
- Producer收到回查消息,检查回查消息对应的本地事务的状态
- 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息核心机制
在具体实现上,事务消息作为普通消息的一个应用场景,在实现过程中进行了分层抽象.
RocketMQ的具体实现策略是:写入的如果是事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,这里和延时消息的实现机制很类似。
如何使用
关于如何使用,可以参考 我说分布式事务之消息一致性事务2-rocketmq的实现
这里有一个综合案例,该案例是以客户端模拟下单成功,下单成功后服务端回调客户端修改订单状态场景进行讲解。 即:模拟下单并处理异步通知
案例图示
模块描述
模块 | 说明 |
---|---|
order-charge-gateway-merchant | 商户收单网关,2C业务,对用户提供下单接口;提供对下单平台的充值回调接口 |
order-charge-gateway-server | 核心收单平台,2B业务,进行业务正式下单,下单完成后投递通知消息到MQ |
order-charge-gateway-notify | 核心通知平台,消费通知消息发送通知到order-charge-gateway-merchant的回调接口 |
order-charge-message-protocol | 消息协议封装 |
order-charge-sdk | HTTP下单接口sdk及通知sdk |
script | 数据库初始化脚本 |
业务描述
用户访问第三方售票网关,进行购票操作。用户发起下单操作,执行支付转账操作
扣减用户账户,增加商户账户金额
转账完成之后,商户发起下单操作,售票平台进行下单操作及扣款操作,(此处为讲解重点– 事务消息,分布式事务场景)返回订单提交成功
- 售票平台下单扣款完成后,认为售票成功,则返回结果通知给第三方售票网关(此处为讲解重点– 普通消息,异步通知)
- 第三方售票网关自行给用户通知即可,未实现
注意:商品信息直接在初始化的时候记载到第三方售票网关和售票平台的缓存中即可。
说明
下单与扣款通过事务消息保证一致性,保证成功率
通知过程通过MQ进行异步解耦,使用普通消息即可,因为通知过程本身是为了最大努力送达,属于最终一致性的范畴,不要求数据的强一致性。
如果通知达到上限阈值,则停止通知,等待商户侧发起主动查询即可。通过通知回调+主动查询,能够在跨网络的交易场景下,实现端与端之间的订单状态的最终一致。
在平台内部,跨服务之间的分布式事务,通过RMQ的事务消息得到保证,事务消息原理可简单介绍。
案例地址
如何参考
请主要阅读以下模块的代码
【事务消息生产者】初始化(即事务发起方初始化):
order-charge-notify
|-order-charge-gateway-server
[事务消息生产者]
|-com.snowalker.order.mq.payment.producer.ChargeOrderPaymentTranProducer
【事务回查及本地事务执行】
order-charge-notify
|-order-charge-gateway-server
[事务回查及本地事务]
|-com.snowalker.order.mq.payment.producer.listener.[ChargeOrderTranListenerImpl]
【事务消息发送】
order-charge-notify
|-order-charge-gateway-server
[事务消息发送]
|-com.snowalker.order.common.service.impl.OrderChargeServiceImpl.[sendPaymentTransactionMsg]
【事务消息消费】初始化,事务消息消费者和普通消息消费者没区别
order-charge-notify
|-order-charge-gateway-notify
[事务消息消费]
|-com.snowalker.notify.mq.payment.consumer.WalletPaymentConsumer
注意
如果业务流程涉及三个及以上节点需要协调完成分布式事务流程,我这里以三个应用节点举例
- 第一个应用是事务的发起方,理论上只有事务消息生产
- 第二个应用是事务的中转者,它在事务消息消费逻辑中还要发起事务消息的发送流程,也就是说,事务消息的中转者是先消费它的上游的事务消息,处理完本地的逻辑之后,再该事务消息继续传递下去。(中转者发送的消息也是事务消息)
- 第三个应用是事务的终结者,理论上它只有事务消息的消费逻辑
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。