自己写分布式事务框架之实现事务提交
从本文开始,我们正式进入自己写分布式事务框架的部分。
笔者将这个本地消息表分布式事务框架命名为 shieldTXC,意思是 神盾事务框架,框架内核及demo案例的代码已经打包上传至github上,
地址为:shieldTXC源码地址。如果觉得这个喜欢可以点个star支持下。
题外话不多说,接下来我们一边看框架的原理图,在宏观上对框架的机理做一个了解,一边对相应的原理进行代码实现讲解,这样理论与实战相结合,相信会加深读者朋友的理解。
框架核心原理
首先看下框架的核心原理图。
看起来还是比较整洁的,这也是笔者写代码的一个宗旨:
好的框架可以复杂,但架构一定是优雅的、清晰的。
我们配合代码深入分析一下这张图,主要分为两个主要的阶段:
- 事务提交阶段
- 事务回滚阶段
首先来看下事务提交阶段的实现原理及相对应的代码实现。
框架分析之事务提交
图中左侧的红字标记该部分为上游应用(后续统称为上游)的事务提交阶段的运行逻辑,上游在这个阶段又分为两个子阶段。
第一阶段:本地事务与消息持久化
上游在执行本地事务成功在,在同一事务内对业务实体封装为消息体,调用ShieldTXC提供的消息持久化接口将消息持久化到业务库中,框架的消息持久化方法需要保证事务性。
代码如下:
@Transactional(rollbackFor = Exception.class)
public void testTran() {
// 本地事务
doLocalTransaction();
// 消息持久化
TestTxMessage testTxMessage = new TestTxMessage();
testTxMessage.setName(UUID.randomUUID().toString().replace("-", "").substring(0, 10));
shieldTxcRocketMQProducerClient
.putMessage(testTxMessage, EventType.INSERT, TXType.COMMIT, testTxMessage.getName(),
UUID.randomUUID().toString());
}
这个阶段的重点在于消息持久化的实现。一起来看一下ShieldTXC是如何实现的:
/**
* 消息持久化
* @param shieldTxcMessage
* @param eventType
* @param txType
* @param appId
*/
@Transactional(rollbackFor = Exception.class)
public void putMessage(AbstractShieldTxcMessage shieldTxcMessage,
EventType eventType,
TXType txType,
String appId,
String bizKey) {
Preconditions.checkNotNull(eventType, "Please insert eventType, type is:[com.shield.txc.constant.EventType]");
Preconditions.checkNotNull(bizKey, "Please insert unique bizKey!");
ShieldEvent event = new ShieldEvent();
event.setEventType(eventType.toString())
.setTxType(txType.toString())
.setEventStatus(EventStatus.PRODUCE_INIT.toString())
.setContent(shieldTxcMessage.encode())
.setBizKey(bizKey)
.setAppId(appId);
try {
// 入库失败回滚
boolean insertResult = this.getBaseEventService().insertEvent(event);
if (!insertResult) {
throw new BizException("insert ShieldEvent into DB occurred Exception!");
}
} catch (Exception e) {
// 异常回滚
throw new BizException("insert ShieldEvent into DB occurred Exception!", e);
}
}
为了便于统一管理,ShieldTXC定义了一个抽象消息类,业务方需要继承该抽象类,通过实现其中的decode与encode方法为业务消息提供编解码能力。
public abstract class AbstractShieldTxcMessage implements Serializable {
private static final long serialVersionUID = -2416427331208398607L;
/**消息序列化*/
public abstract String encode();
/**消息反序列化*/
public abstract void decode(String msg);
}
我们接着看上面的putMessage方法。
通过传参,组装了一个ShieldEvent持久化消息实体,并将其进行insert操作,通过判断入库是否成功决定是否抛出异常让事务进行回滚。
如果消息持久化成功,则本地事务提交;如果消息持久化失败,则本地事务回滚,业务结束。
第二阶段:消息投递
业务方完成上述第一阶段的消息持久化操作,就不需要进行其他的额外操作了。
此时框架开始执行第二阶段:消息投递阶段。
我们从图中可以清晰的看到,ShieldEvent会在内部维护一个定时任务,扫描状态为 初始化待发送 消息,组装成功消息体并调用MQ的发送消息接口,将投递到 事务提交队列。这个过程中要保证消息可达。
这个子阶段用文字描述起来比较简洁,我们接着看下框架代码是如何实现的,加深认知。
2.1定时任务初始化
整个ShieldTXC内核已经打包为一个Spring Boot的starter,因此可以通过@Enable注解简单的整合到Spring Boot应用中,框架本身提供了对核心Bean的初始化过程,这里我们主要看下扫描待发送消息的定时任务的初始化过程。
定时任务的初始化过程是在框架的ShieldEventTxcConfiguration.java类中实现的,类声明如下:
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
public class ShieldEventTxcConfiguration {
通过@Configuration标记为一个配置bean,通过@EnableConfigurationProperties(RocketMQProperties.class)开启可配置能力。ShieldTXC支持通过配置文件进行配置。
这里重点分析一下扫描待发送消息定时任务的初始化:
/**
* 异步消息调度构造
* @param rocketMQProperties
* @return
*/
@Bean
@ConditionalOnMissingBean
@Order(value = 3)
public SendTxcMessageScheduler sendTxcMessageScheduler(RocketMQProperties rocketMQProperties) {
SendTxcMessageScheduler sendTxcMessageScheduler = new SendTxcMessageScheduler();
// 设置调度线程池参数
sendTxcMessageScheduler.setInitialDelay(rocketMQProperties.getTranMessageSendInitialDelay());
sendTxcMessageScheduler.setPeriod(rocketMQProperties.getTranMessageSendPeriod());
sendTxcMessageScheduler.setCorePoolSize(rocketMQProperties.getTranMessageSendCorePoolSize());
// 数据库操作
sendTxcMessageScheduler.setBaseEventService(baseEventService(baseEventRepository()));
// 消息发送
sendTxcMessageScheduler.setShieldTxcRocketMQProducerClient(rocketMQEventProducerClient(rocketMQProperties));
LOGGER.debug("Initializing [sendTxcMessageScheduler] instance success.");
// 执行调度
sendTxcMessageScheduler.schedule();
return sendTxcMessageScheduler;
}
通过@Bean标记为Spring容器中的bean,Spring在初始化过程中会加载我们的bean,bean的name就是方法名,这个方式是Spring提供的通过JavaConfig方式进行bean定义的方式,这里不做展开。
我们首先初始化了一个SendTxcMessageScheduler实例,对其进行参数的配置,诸如线程池参数、数据库操作相关的bean依赖(通过相同方式进行初始化的)、消息发送的bean依赖(通过相同方式进行初始化的)。依赖设置完成后调用schedule()方法开启任务调度。当应用启动完成之后便会自动开始进行待发送消息的扫描操作。
关于数据库操作bean、RocketMQ操作的bean初始化过程,感兴趣的读者可以去源码的com.shield.txc.configuration.ShieldEventTxcConfiguration.java类中查看,请恕本文不再展开。
2.2定时任务核心逻辑分析
初始化完成SendTxcMessageScheduler之后,具体又是如何对待发送消息进行处理的呢?带着疑问,我们深入SendTxcMessageScheduler这个调度类中一探究竟。
SendTxcMessageScheduler方法的声明如下:
public class SendTxcMessageScheduler extends AbstractMessageScheduler implements Runnable {
可以看到SendTxcMessageScheduler实例本身也是一个Runnable实例
,这里暂且放一下,后续的分析中会用到,我们接着往下看。
上述的代码中在初始化完成SendTxcMessageScheduler实例后,调用了schedule()方法,那么我们重点看下这个方法。
public void schedule() {
executorService.scheduleAtFixedRate(
this,
this.initialDelay,
this.period,
this.timeUnit);
}
代码逻辑很简单,通过内部的executorService开启了调度流程,executorService的初始化是在构造方法中完成的,默认构造方法如下:
public SendTxcMessageScheduler() {
executorService = Executors.newScheduledThreadPool(corePoolSize);
}
在schedule()方法中,调用scheduleAtFixedRate方法,第一个参数传入了this引用,线程池会定时执行this引用(也是一个Runnable引用)的run方法,方法逻辑:
@Override
public void run() {
// 查询并发送消息
try {
// 获取待调度的消息,初始态==初始化
List<ShieldEvent> shieldEvents = baseEventService.queryEventListByStatus(EventStatus.PRODUCE_INIT.toString());
if (CollectionUtils.isEmpty(shieldEvents)) {
return;
}
for (ShieldEvent shieldEvent : shieldEvents) {
// 发送前改状态
processBeforeSendMessage(shieldEvent);
// 发送消息核心逻辑
sendMessage(shieldEvent);
// 判断发送结果,成功则更新为已发送
processAfterSendMessage(shieldEvent);
}
} catch (Exception e) {
LOGGER.error("Sending rollback message occurred Exception!", e);
return;
}
}
这里主要采用了模板方法对业务逻辑进行了封装。
首先获取待调度的消息,状态为 PRODUCE_INIT (生产初始化)。默认获取50条。
如果未查询到消息,则结束本次调度,对于查询到的消息列表shieldEvents进行迭代:
1.首先进行发送前置操作,即方法processBeforeSendMessage。将消息的状态从 PRODUCE_INIT 改为 PRODUCE_PROCESSING (生产处理中)。这么做的目的在于通过状态机方式的乐观锁达到支持并发的目的。这个思路在日常业务开发中也经常用到。
2.接着进行发送核心操作,这里需要看一下代码是如何实现的
/**
* 发送事务消息
* @param shieldEvent
*/
@Override
public void sendMessage(ShieldEvent shieldEvent) {
int eventId = shieldEvent.getId();
// 组装Message
ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage
.setId(String.valueOf(eventId))
.setAppId(shieldEvent.getAppId())
.setContent(shieldEvent.getContent())
.setEventType(shieldEvent.getEventType())
.setEventStatus(shieldEvent.getEventStatus())
.setTxType(shieldEvent.getTxType())
.setBizKey(shieldEvent.getBizKey());
String messgeBody = shieldTxcMessage.encode();
String topic = null;
BizResult bizResult = null;
// 发送commit消息,判断消息类型
if (TXType.COMMIT.toString().equals(shieldTxcMessage.getTxType())) {
topic = MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_COMMMIT_STAGE,
shieldTxcRocketMQProducerClient.getTopic());
Message commitMessage = new Message(topic, messgeBody.getBytes());
bizResult = shieldTxcRocketMQProducerClient.sendCommitMsg(commitMessage, eventId);
}
// 发送rollback消息
if (TXType.ROLLBACK.toString().equals(shieldTxcMessage.getTxType())) {
topic = MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_ROLLBACK_STAGE,
shieldTxcRocketMQProducerClient.getTopic());
Message rollbackMessage = new Message(topic, messgeBody.getBytes());
bizResult = shieldTxcRocketMQProducerClient.sendRollbackMsg(rollbackMessage, eventId);
}
if (bizResult.getBizCode() != BizCode.SEND_MESSAGE_SUCC) {
LOGGER.debug("[SendTxcMessageScheduler] Send ShieldTxc Message result:[FAIL], Message Body:[{}]", messgeBody);
return;
}
LOGGER.debug("[SendTxcMessageScheduler] Send ShieldTxc Message result:[SUCCESS], Message Body:[{}]", messgeBody);
}
首先将消息实体转换为ShieldTxcMessage。ShieldTxcMessage是框架封装的消息类型,内部的encode、decode方法提供了序列化、反序列化能力。
接着判断消息类型,如果是 TXType.COMMIT 则将消息投递到事务提交队列;如果是 TXType.ROLLBACK 则将消息投递到事务回滚队列。
在上游业务中发送的消息均为事务提交消息,因此会被投递到事务提交队列中。
最后判断消息发送结果,如果发送失败会进行重试,该重试能力是MQ中间件客户端提供的。对于多次发送都失败的消息需要更改状态为初始化,继续进行投递,保证消息一定能发出去。长时间发布出去的消息进发送失败表,后续需要对该失败表进行扫描,触发本地业务回滚。这种情况因为极其少见,因此框架当前版本暂未实现,后续更新后在github的页面中注明。
3.消息投递成功后,调用processAfterSendMessage方法进行状态更新,将处理中状态 PRODUCE_PROCESSING 改为 PRODUCE_PROCESSED (生产处理成功)。
到此就完成了分布式事务上游的消息发布流程。
第三阶段:消息消费,下游事务提交
为了方便读者对照,这里再贴一下原理图。我们接着分析事务下游应用(后文称下游)是如何对事务提交队列中的消息进行消费从而完成本地事务的。
3.1消费适配器初始化
下游应用启动过程中需要完成消费适配器的初始化,实现对事务提交队列消息的消费。该消费逻辑实现了下游与上游的最终一致性。
@Service
public class TxConsumeService implements InitializingBean {
@Value("${shield.event.rocketmq.nameSrvAddr}")
String nameSerAddr;
@Value("${shield.event.rocketmq.topicSource}")
String topic;
@Override
public void afterPropertiesSet() throws Exception {
new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcCommitListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("测试消费ShieldTxcCommitListener开始......");
Random ra =new Random();
int randomInt = ra.nextInt(10) + 1;
if (randomInt <= 5) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}));
}
}
上面这段代码为下游应用需要实现的,在应用启动阶段初始化了ShieldTxcConsumerListenerAdapter消费适配器。
【ShieldTxcConsumerListenerAdapter】
ShieldTxcConsumerListenerAdapter的主要构造方法有三个,分别为:
事务下游可选构造方法,即分布式事务的最终执行者
/**
* 事务下游可选
* @param nameSrvAddr
* @param topic
* @param txCommmtListener
*/
public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
String topic,
ShieldTxcCommitListener txCommmtListener) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
this.txCommmtListener = txCommmtListener;
init();
}
事务上游可选构造方法,即分布式事务的最初发起者
/**
* 事务上游可选
* @param nameSrvAddr
* @param topic
* @param txRollbackListener
*/
public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
String topic,
ShieldTxcRollbackListener txRollbackListener) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
this.txRollbackListener = txRollbackListener;
init();
}
事务上下游可选构造方法,即一个应用处于上游和下游之间,它既是该分布式事务的上游又是该分布式事务的下游
public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
String topic,
ShieldTxcCommitListener txCommmtListener,
ShieldTxcRollbackListener txRollbackListener) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
this.txCommmtListener = txCommmtListener;
this.txRollbackListener = txRollbackListener;
init();
}
上述的demo代码所在的应用为分布式事务的最终下游,因此只需要实现ShieldTxcCommitListener接口,完成consumeMessage回调方法。我们在其中模拟百分之五十提交,百分之五十重试的情况,测试正常与异常情况下框架的表现。
在ShieldTxcConsumerListenerAdapter构造方法中均调用了init()方法,该方法初始化了真正的消费者客户端,对MQ进行监听。
public ShieldTxcConsumerListenerAdapter init() {
// 初始化shieldTxcRocketMQConsumerClient
Preconditions.checkNotNull(this.nameSrvAddr, "please insert RocketMQ NameServer address");
shieldTxcRocketMQConsumerClient =
new ShieldTxcRocketMQConsumerClient(this.topic, this.nameSrvAddr, this.getTxCommmtListener(), this.getTxRollbackListener());
LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient] instance init success.");
return this;
}
【shieldTxcRocketMQConsumerClient】
我们进入shieldTxcRocketMQConsumerClient类中,观察一下它是如何进行初始化的。
public ShieldTxcRocketMQConsumerClient(String topic,
String nameSrvAddr,
ShieldTxcCommitListener txCommtListener,
ShieldTxcRollbackListener txRollbackListener) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
if (txCommtListener == null && txRollbackListener == null) {
throw new BizException("Please define at least one MessageListenerConcurrently instance, such as [ShieldTxcCommitListener] or [ShieldTxcRollbackListener] or both.");
}
if (txCommtListener != null) {
// 初始化事务提交消费者
initCommitConsumer(this.topic, this.nameSrvAddr, txCommtListener);
LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient.CommmitConsumer] instance init success.");
}
if (txRollbackListener != null) {
// 初始化事务回滚消费者
initRollbackConsumer(this.topic, this.nameSrvAddr, txRollbackListener);
LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient.RollbackListener] instance init success.");
}
}
上述代码是ShieldTxcRocketMQConsumerClient初始化流程,可以看到主要是判断了ShieldTxcCommitListener、ShieldTxcRollbackListener实例是否为空,如果不为空则进行对应消费者的初始化。两个初始化流程基本相同,我们重点看下initCommitConsumer方法的逻辑。
/**
* 初始化事务提交消费者
* @param topic
* @param nameSrvAddr
*/
private void initCommitConsumer(String topic, String nameSrvAddr, ShieldTxcCommitListener txCommtListener) {
commitConsumer =
new DefaultMQPushConsumer(
MessagePropertyBuilder.groupId(CommonProperty.TRANSACTION_COMMMIT_STAGE, topic));
commitConsumer.setNamesrvAddr(nameSrvAddr);
// 从头开始消费
commitConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式:集群模式
commitConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
commitConsumer.registerMessageListener(txCommtListener);
// 订阅所有消息
try {
commitConsumer.subscribe(
MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_COMMMIT_STAGE, topic), "*");
// 启动消费者
commitConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException("Loading [com.shield.txc.RocketMQEventConsumerClient.commmitConsumer] occurred exception", e);
}
}
相信不需要我再做多余的解释了,这里其实就是启动了一个DefaultMQPushConsumer消费者客户端,并对对应的Topic进行订阅,从而实现对对应队列中消息的消费。如果读者对RocketMQ不是很了解,可以到RocketMQ开发者中心进行相关的学习。RocketMQ中文开发者中心
回到原理图中,可以看到下游应用的逻辑中有一个 ShieldTXC Commit拦截器,这个拦截器才是消费阶段的重点,框架通过该拦截器对消费过程进行了代理,加入了前置后置操作,从而保证了消费阶段的分布式事务的一致性。
拦截器代码在ShieldTxcCommitListener中。
【ShieldTxcCommitListener】
ShieldTxcCommitListener的类声明及构造方法如下:
public class ShieldTxcCommitListener implements MessageListenerConcurrently {
public ShieldTxcCommitListener(MessageListenerConcurrently txCommmtListener) {
this.txCommmtListener = txCommmtListener;
}
可以看到ShieldTxcCommitListener实现了MessageListenerConcurrently接口。当ShieldTxcCommitListener在构造过程中将外界传入的MessageListenerConcurrently实例的引用指向了内部的MessageListenerConcurrently引用。
ShieldTxcCommitListener本身也是MessageListenerConcurrently实例,通过它的consumeMessage代理了外部传入的MessageListenerConcurrently实例,通过加入了切面逻辑对消费过程做了进一步的处理。
这里就对ShieldTxcCommitListener如何对真实的消费过程进行代理做深入的分析。
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgBody = new String(msg.getBody());
String msgId = msg.getMsgId();
LOGGER.debug("[ShieldTxcCommitListener]Consuming [COMMIT] Message start... msgId={},msgBody={}", msgId, msgBody);
首先进行前置参数的获取,这里不需要过多解释。
ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage.decode(msgBody);
ShieldEvent event = new ShieldEvent();
event.convert(shieldTxcMessage);
BaseEventService baseEventService =
(BaseEventService) SpringApplicationHolder.getBean("baseEventService");
将消息体进行反序列化,并转换为ShieldEvent消息实体便于后续操作;通过Spring上下文获取到数据库操作bean,便于进行消息状态修改。
try {
// 消费幂等,查询消息是否存在,入库带唯一索引
// 消费次数大于等于阈值,回滚事务
int currReconsumeTimes = msg.getReconsumeTimes();
if (currReconsumeTimes >= CommonProperty.MAX_COMMIT_RECONSUME_TIMES) {
// 事务回滚操作,消息复制为回滚生产者,持久化
LOGGER.debug("[ShieldTxcCommitListener] START transaction rollback sequence! msgId={},currReconsumeTimes={}", msgId, currReconsumeTimes);
if (doPutRollbackMsgAfterMaxConsumeTimes(baseEventService, event, msgId)) {
LOGGER.debug("[ShieldTxcCommitListener] transaction rollback sequence executed SUCCESS! msgId={}", msgId);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
// 如果一直失败最后会进死信
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
这里获取了消息的当前消费次数,并与最大消费次数进行比对。
如果消费次数已经超过最大消费次数(默认为10次),则进行分布式事务的回滚操作。这么做的原因为:下游无论如何都没办法提交本地事务,如果不回滚则上下数据一致性就被破坏了,因此这里通过doPutRollbackMsgAfterMaxConsumeTimes进行了如下的业务处理:
- 将消息的状态由 CONSUME_PROCESSING (消费处理中,该状态在首次消费开始时就由CONSUME_INIT修改得到)改为 CONSUME_MAX_RECONSUMETIMES (达到最大消费次数)。
- 克隆消息,插入事务回滚消息,状态为 PRODUCE_INIT
- 完成后等待事务下游的SendTxcMessageScheduler对回滚消息进行投递。
String bizKey = shieldTxcMessage.getBizKey();
String txType = shieldTxcMessage.getTxType();
String eventStatua = shieldTxcMessage.getEventStatus();
String appId = shieldTxcMessage.getAppId();
String eventType = shieldTxcMessage.getEventType();
// 进行消息持久化
event.setEventType(eventType)
.setTxType(txType)
.setEventStatus(EventStatus.CONSUME_INIT.toString())
.setContent(shieldTxcMessage.getContent())
.setAppId(shieldTxcMessage.getAppId())
.setBizKey(bizKey)
.setId(Integer.valueOf(shieldTxcMessage.getId()));
// 入库失败回滚
boolean insertResult = baseEventService.insertEventWithId(event);
if (!insertResult) {
LOGGER.warn("[ShieldTxcCommitListener] insert shieldEvent Consume Message failed,msgId={}", msgId);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
如果当前没有达到最大消费次数则将消息进行持久化操作,这里在数据库中对消息的id与业务唯一键bizkey加了唯一索引,如果重复插入会报唯一约束异常,便不会重复插入,保证了消息的唯一性。
// 改消费处理中
doUpdateMessageStatusProcessing(baseEventService, event);
// 真实消费
return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
消息入库后将消息状态改为 CONSUME_PROCESSING (消费处理中),接着进行真实的消费过程,通过拦截判断真实的消费结果对消息状态进行对应的修改。
} catch (Exception e) {
// 幂等处理:唯一约束触发则直接进行消费
if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
LOGGER.debug("[ShieldTxcCommitListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
}
if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
LOGGER.debug("[ShieldTxcCommitListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
}
// 其他异常重试
LOGGER.warn("ShieldTxcCommitListener Consume Message occurred Exception,msgId={}", msgId, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return null;
}
这里为对消息重复插入的处理流程,如果消息重复入库,会抛出异常并被捕获,则直接进行真实消息消费流程,通过方法doUpdateAfterConsumed实现。
/**
* 拦截真实消费结果,根据消费结果更新消息状态
*
* @param consumeConcurrentlyStatus
* @param baseEventService
* @param shieldEvent
* @return
*/
private ConsumeConcurrentlyStatus doUpdateAfterConsumed(BaseEventService baseEventService,
ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
ShieldEvent shieldEvent) {
LOGGER.debug("[ShieldTxcCommitListener::doUpdateAfterConsumed] The Real ConsumeConcurrentlyStatus is : [{}]", consumeConcurrentlyStatus);
if (ConsumeConcurrentlyStatus.RECONSUME_LATER.name().equals(consumeConcurrentlyStatus.name())) {
// 消费失败,消费状态仍旧处理中
return consumeConcurrentlyStatus;
}
if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name().equals(consumeConcurrentlyStatus.name())) {
// 消费成功,处理中改完成,更新前状态:消费处理中
shieldEvent.setBeforeUpdateEventStatus(shieldEvent.getEventStatus());
// 更新后状态:消费完成
shieldEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
boolean updateBefore = baseEventService.updateEventStatusById(shieldEvent);
if (!updateBefore) {
// 更新失败,幂等重试.此时必定是系统依赖组件出问题了
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
调用doUpdateAfterConsumed时传入的参数ConsumeConcurrentlyStatus代表业务层返回的消费状态。该状态只有两个值:CONSUME_SUCCESS/RECONSUME_LATER。
如果返回RECONSUME_LATER,表明事务提交消息消费失败,后续会继续进行重试。消息表中消息的状态仍旧为 CONSUME_PROCESSING,我们不做多余的处理。
如果返回CONSUME_SUCCESS,表明事务提交消息消费成功,则将数据库中的这条消息状态改为 CONSUME_PROCESSED。如果修改不成功,则进行重试即可,业务侧的消费逻辑要注意保证消费幂等。
整个消费过程如果达到重试次数上限仍旧不能成功,则会触发全局事务的回滚操作,这就保证了整个分布式事务的闭环。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。