自己写分布式事务框架之实现事务回滚
上一篇文章中,我们对事务提交部分的框架逻辑及代码实现做了较为详细的讲解;本文中,我们接着分析一下事务回滚阶段的机理及代码实现逻辑。
这里主要看图的下半部分。
当事务下游应用达到最大消费次数,事务回滚被消息持久化之后,ShieldTXC的消息发送线程sendTxcMessageScheduler会扫描到待发送的回滚消息并投递到 [事务回滚队列]。
第一阶段:实现回滚逻辑
事务上游应用在启动过程中初始化了ShieldTxcConsumerListenerAdapter消费适配器,并通过ShieldTxcRollbackListener实现了回滚逻辑。
@Service
public class TxConsumeService implements InitializingBean {
@Value("${shield.event.rocketmq.nameSrvAddr}")
String nameSerAddr;
@Value("${shield.event.rocketmq.topicSource}")
String topic;
@Override
public void afterPropertiesSet() throws Exception {
new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcRollbackListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("测试消费【回滚】ShieldTxcRollbackListener");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}));
}
}
这段代码在之前的事务提交阶段已经讲解过,事务发起端需要自行实现回滚逻辑,这样才能在异常发生时与事务下游保持数据一致性。更多的细节此处就不再赘述。
第二阶段:回滚拦截
同ShieldTxcCommitListener类似,ShieldTxcRollbackListener也实现了对回滚消费逻辑的拦截,它的声明如下:
【ShieldTxcRollbackListener】
public class ShieldTxcRollbackListener implements MessageListenerConcurrently {
private MessageListenerConcurrently txRollbackListener;
public ShieldTxcRollbackListener(MessageListenerConcurrently txRollbackListener) {
this.txRollbackListener = txRollbackListener;
}
ShieldTxcRollbackListener同样实现了MessageListenerConcurrently接口。在构造过程中将外界传入的MessageListenerConcurrently实例的引用指向了内部的MessageListenerConcurrently引用。
ShieldTxcRollbackListener同样是MessageListenerConcurrently实例,通过它的consumeMessage方法代理了外部传入的MessageListenerConcurrently实例,通过加入了切面逻辑对消费过程做了进一步的处理。
我们重点对ShieldTxcCommitListener如何对真实的消费过程进行代理做深入的分析,核心思路同样是在真实的回滚消费逻辑之前加入前置处理,在真实消费逻辑之后加入后置处理。
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 测试打印消息体
for (MessageExt msg : msgs) {
String msgBody = new String(msg.getBody());
String msgId = msg.getMsgId();
LOGGER.debug("[ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId={},msgBody={}", msgId, msgBody);
获取参数进行日志打印。
ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage.decode(msgBody);
ShieldEvent rollbackEvent = new ShieldEvent();
rollbackEvent.convert(shieldTxcMessage);
BaseEventService baseEventService =
(BaseEventService) SpringApplicationHolder.getBean("baseEventService");
将消息协议进行反序列化,并转换为消息实体,从Spring上下文中获取数据库持久化服务BaseEventService,为后续数据库操作做准备。
try {
// 取参数
String bizKey = shieldTxcMessage.getBizKey();
String txType = shieldTxcMessage.getTxType();
String eventStatua = shieldTxcMessage.getEventStatus();
String appId = shieldTxcMessage.getAppId();
String eventType = shieldTxcMessage.getEventType();
// 回滚消息持久化
rollbackEvent.setEventType(eventType)
.setTxType(TXType.ROLLBACK.toString())
.setEventStatus(EventStatus.CONSUME_INIT.toString())
.setContent(shieldTxcMessage.getContent())
.setAppId(shieldTxcMessage.getAppId())
.setBizKey(bizKey)
.setId(Integer.valueOf(shieldTxcMessage.getId()));
// 入库失败回滚
boolean insertResult = baseEventService.insertEventWithId(rollbackEvent);
if (!insertResult) {
LOGGER.warn("[ShieldTxcRollbackListener] insert RollbackShieldEvent Consume Message failed,msgId={}", msgId);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
上游执行回滚消息持久化,持久化成功后状态为 CONSUME_INIT (消费初始化),
// 改消费处理中
doUpdateMessageStatusProcessing(baseEventService, rollbackEvent);
// 真实消费
return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
消息持久化后将消息状态改为消费处理中 [CONSUME_PROCESSING],进行真实消费过程,真实消费完成后对消费结果做后置处理。
} catch (Exception e) {
// 幂等处理:唯一约束触发则直接进行消费
if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
LOGGER.debug("[ShieldTxcRollbackListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
}
if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
LOGGER.debug("[ShieldTxcRollbackListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
}
// 其他异常重试
LOGGER.warn("ShieldTxcRollbackListener Consume Message occurred Exception,msgId={}", msgId, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return null;
}
这里的逻辑与ShieldTxcCommitListener的类似,对消费做幂等处理,重复消息不再入库,直接进行消费,上游回滚消费逻辑需要满足幂等性。
我们接着看一下doUpdateAfterRollbackConsumed方法如何进行消费后置处理。
/**
* 拦截真实消费结果,根据消费结果更新消息状态
*
* @param consumeConcurrentlyStatus
* @param baseEventService
* @param rollbackEvent
* @return
*/
private ConsumeConcurrentlyStatus doUpdateAfterRollbackConsumed(BaseEventService baseEventService,
ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
ShieldEvent rollbackEvent) {
if (ConsumeConcurrentlyStatus.RECONSUME_LATER == consumeConcurrentlyStatus) {
// 消费失败,消费状态仍旧处理中
return consumeConcurrentlyStatus;
}
if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus) {
// 消费成功,处理中改完成,更新前状态:消费处理中
rollbackEvent.setBeforeUpdateEventStatus(rollbackEvent.getEventStatus());
// 更新后状态:消费处理中
rollbackEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
boolean updateBefore = baseEventService.updateEventStatusById(rollbackEvent);
if (!updateBefore) {
// 更新失败,幂等重试.此时必定是系统依赖组件出问题了
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
拦截真实回滚消费逻辑获取消费结果,如果是消费成功 [CONSUME_SUCCESS] ,更改消息状态为消费完成 [CONSUME_PROCESSED]。如果是消费失败,则会进行重试。
上游的回滚逻辑通过重试保证成功,如果达到MQ最大重试次数(RocketMQ是16次)会进死信,此时需要人工介入进行补偿操作。
在上线之前只要进行了充分的测试,保证业务无严重bug,确保线上MQ集群、应用集群的高可用,一般通过重试的方式都能够达成最终一致。如果出现大量数据不一致的情况,那么大概率就是应用存在bug或者MQ集群不稳定,此时需要人工介入进行排错。
小结
到这里我们就完成了整个框架原理图与代码实现的分析。
实现告一段落,我们接着看一个简单的应用demo,直观感受一下ShieldTXC分布式事务框架的魅力。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。