文章目录
  1. 1. 全文总结

之前的两篇文章中,我讲解了ShieldTXC的框架内核的代码实现及其原理。前前后后设计开发总共耗时一周,目前的版本基本可用,那么本文我们就使用这个版本进行一次实战演练,让读者直观的感受一下ShieldTXC实现的本地消息表分布式事务方案的魅力。

进入shieldTXC项目,可以看到项目整体的结构如下:

shieldTxc
    |-txc-core        ShieldTXC内核
    |-txc-demo-up     分布式事务上游应用
    |-txc-demo-down   分布式事务下游应用

首先对项目进行整体的编译,接着对应用txc-demo-up、txc-demo-down,修改配置文件application.properties

########################################################################
#
#     shield-txc
#
#########################################################################
# RocketMQ的nameserver地址:必填
shield.event.rocketmq.nameSrvAddr=127.0.0.1:9876
# 事务消息topic:非必填,默认为DEFAULT_TXC_XXX
shield.event.rocketmq.topicSource=DEFAULT_TXC_XXX
# 发送失败重试次数:非必填,默认为10次
shield.event.rocketmq.retryTimesWhenSendFailed=3次
# 消息发送调度初始化延时,单位:秒:非必填,默认为0
shield.event.rocketmq.tranMessageSendInitialDelay=0秒
# 消息发送调度间隔,单位:秒,非必填,默认为5秒
shield.event.rocketmq.tranMessageSendPeriod=5
# 消息发送调度核心线程数:非必填,默认为1
shield.event.rocketmq.tranMessageSendCorePoolSize=1

如果仅仅是对框架进行尝鲜,只需要配置NameServer地址即可。

接着需要初始化数据库,在两个应用的application.properties中配置数据源,并在各自的数据源中放置消息表,初始化脚本在项目根路径下的script目录下,直接导入数据库即可。

在正式测试demo前需要保证有可用的RocketMQ环境,关于如何搭建RocketMQ读者可以自行学习。

MQ环境就绪后,分别启动两个应用, 等待生产者端完成本地事务并将事务提交消息持久化。

上游应用打印日志如下:

事务上游本地事务开始,消息持久化.....
2019-08-04 12:35:27.117 |-DEBUG [pool-1-thread-1] com.shield.txc.schedule.SendTxcMessageScheduler [141] -| [SendTxcMessageScheduler] Send ShieldTxc Message result:[SUCCESS], Message Body:[{"id":"112","eventType":"INSERT","txType":"COMMIT","eventStatus":"PRODUCE_PROCESSING","content":"{\"name\":\"23abaae6a2\"}","appId":"23abaae6a2","bizKey":"2425b011-bdf8-4a64-977a-ffa73e8c23c6"}]

日志表明本地事务执行完成,消息持久化,我们看下数据库中的记录:

上游逻辑

可以看到消息已经入库,并且应被处理为 [PRODUCE_PROCESSED] (生产处理完成),也就是消息被投递到MQ。

下游应用打印如下:

测试消费ShieldTxcCommitListener开始......
2019-08-04 12:35:27.802 |-DEBUG [ConsumeMessageThread_1] com.shield.txc.listener.ShieldTxcCommitListener [165] -| [ShieldTxcCommitListener::doUpdateAfterConsumed] The Real ConsumeConcurrentlyStatus is : [RECONSUME_LATER]

下游开始对事务提交消息进行消费,执行本地业务。

下游事务提交逻辑返回的状态为 [RECONSUME_LATER],重试三次后事务回滚,持久化事务回滚消息,并投递到MQ。

下游数据库中消息记录如下:

下游逻辑

可以看到符合我们的预期,事务提交消息消费三次后不再消费,状态更改为 [CONSUME_MAX_RECONSUMETIMES],同时投递了事务回滚消息,消息状态为 [PRODUCE_PROCESSED]

我们回到上游应用,查看到日志如下:

2019-08-04 12:37:09.202 |-DEBUG [ConsumeMessageThread_1] com.shield.txc.listener.ShieldTxcRollbackListener [48] -| [ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId=C0A801653B9818B4AAC2122845CC0000,msgBody={"id":"113","eventType":"INSERT","txType":"ROLLBACK","eventStatus":"PRODUCE_PROCESSING","content":"{\"name\":\"23abaae6a2\"}","appId":"23abaae6a2","bizKey":"2425b011-bdf8-4a64-977a-ffa73e8c23c6"}
测试消费【回滚】ShieldTxcRollbackListener

表明上游应用消费了事务回滚消息,持久的回滚消息状态如图:

上游逻辑

从这个完整的业务执行链路可以看到,ShieldTXC能够很好的处理分布式事务,保证分布式系统上下游数据的一致性。

如果下游消费事务提交消息返回 [CONSUME_SUCCESS],则下游直接提交事务并更改消息状态为 [CONSUME_PROCESSED],整个事务流程就结束了,这也是符合我们预期的。

全文总结

分布式事务是分布式领域一个较为棘手的难题,在几年前,各种分布式事务解决方案作为大厂的技术壁垒是不会轻易对外公布的。

随着近年来开源社区的活跃,我们得以对各种解决方案一探究竟。

本文中,笔者从理论入手,辅助以代码讲解,对本地消息表方案做了一次较为深入的讲解及实现,希望本文的方案及实现思路能够对读者理解并实现分布式事务解决方案有所启发。

如果你有什么疑问和建议,也欢迎在评论区进行留言,在此先行谢过。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 全文总结
Fork me on GitHub