文章目录
  1. 1. 第一阶段:实现回滚逻辑
  2. 2. 第二阶段:回滚拦截
    1. 2.1. 【ShieldTxcRollbackListener】
  • 小结
  • 上一篇文章中,我们对事务提交部分的框架逻辑及代码实现做了较为详细的讲解;本文中,我们接着分析一下事务回滚阶段的机理及代码实现逻辑。

    shieldTXC.PNG

    这里主要看图的下半部分。

    当事务下游应用达到最大消费次数,事务回滚被消息持久化之后,ShieldTXC的消息发送线程sendTxcMessageScheduler会扫描到待发送的回滚消息并投递到 [事务回滚队列]

    第一阶段:实现回滚逻辑

    事务上游应用在启动过程中初始化了ShieldTxcConsumerListenerAdapter消费适配器,并通过ShieldTxcRollbackListener实现了回滚逻辑。

    @Service
    public class TxConsumeService implements InitializingBean {
    
        @Value("${shield.event.rocketmq.nameSrvAddr}")
        String nameSerAddr;
        @Value("${shield.event.rocketmq.topicSource}")
        String topic;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcRollbackListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.println("测试消费【回滚】ShieldTxcRollbackListener");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            }));
        }
    }
    

    这段代码在之前的事务提交阶段已经讲解过,事务发起端需要自行实现回滚逻辑,这样才能在异常发生时与事务下游保持数据一致性。更多的细节此处就不再赘述。

    第二阶段:回滚拦截

    同ShieldTxcCommitListener类似,ShieldTxcRollbackListener也实现了对回滚消费逻辑的拦截,它的声明如下:

    【ShieldTxcRollbackListener】
    public class ShieldTxcRollbackListener implements MessageListenerConcurrently {
    
        private MessageListenerConcurrently txRollbackListener;
    
        public ShieldTxcRollbackListener(MessageListenerConcurrently txRollbackListener) {
            this.txRollbackListener = txRollbackListener;
        }
    

    ShieldTxcRollbackListener同样实现了MessageListenerConcurrently接口。在构造过程中将外界传入的MessageListenerConcurrently实例的引用指向了内部的MessageListenerConcurrently引用。

    ShieldTxcRollbackListener同样是MessageListenerConcurrently实例,通过它的consumeMessage方法代理了外部传入的MessageListenerConcurrently实例,通过加入了切面逻辑对消费过程做了进一步的处理。

    我们重点对ShieldTxcCommitListener如何对真实的消费过程进行代理做深入的分析,核心思路同样是在真实的回滚消费逻辑之前加入前置处理,在真实消费逻辑之后加入后置处理。

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 测试打印消息体
        for (MessageExt msg : msgs) {
    
            String msgBody = new String(msg.getBody());
            String msgId = msg.getMsgId();
            LOGGER.debug("[ShieldTxcRollbackListener]Consuming [ROLLBACK] Message start... msgId={},msgBody={}", msgId, msgBody);
    

    获取参数进行日志打印。

    ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
    shieldTxcMessage.decode(msgBody);
    
    ShieldEvent rollbackEvent = new ShieldEvent();
    rollbackEvent.convert(shieldTxcMessage);
    
    BaseEventService baseEventService =
            (BaseEventService) SpringApplicationHolder.getBean("baseEventService");
    

    将消息协议进行反序列化,并转换为消息实体,从Spring上下文中获取数据库持久化服务BaseEventService,为后续数据库操作做准备。

    try {
        // 取参数
        String bizKey = shieldTxcMessage.getBizKey();
        String txType = shieldTxcMessage.getTxType();
        String eventStatua = shieldTxcMessage.getEventStatus();
        String appId = shieldTxcMessage.getAppId();
        String eventType = shieldTxcMessage.getEventType();
    
        // 回滚消息持久化
        rollbackEvent.setEventType(eventType)
                .setTxType(TXType.ROLLBACK.toString())
                .setEventStatus(EventStatus.CONSUME_INIT.toString())
                .setContent(shieldTxcMessage.getContent())
                .setAppId(shieldTxcMessage.getAppId())
                .setBizKey(bizKey)
                .setId(Integer.valueOf(shieldTxcMessage.getId()));
    
        // 入库失败回滚
        boolean insertResult = baseEventService.insertEventWithId(rollbackEvent);
        if (!insertResult) {
            LOGGER.warn("[ShieldTxcRollbackListener] insert RollbackShieldEvent Consume Message failed,msgId={}", msgId);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    

    上游执行回滚消息持久化,持久化成功后状态为 CONSUME_INIT (消费初始化),

    // 改消费处理中
    doUpdateMessageStatusProcessing(baseEventService, rollbackEvent);
    // 真实消费
    return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
    

    消息持久化后将消息状态改为消费处理中 [CONSUME_PROCESSING],进行真实消费过程,真实消费完成后对消费结果做后置处理。

            } catch (Exception e) {
                // 幂等处理:唯一约束触发则直接进行消费
                if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
                    LOGGER.debug("[ShieldTxcRollbackListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
                    return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
                }
                if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
                    LOGGER.debug("[ShieldTxcRollbackListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
                    return doUpdateAfterRollbackConsumed(baseEventService, this.txRollbackListener.consumeMessage(msgs, context), rollbackEvent);
                }
                // 其他异常重试
                LOGGER.warn("ShieldTxcRollbackListener Consume Message occurred Exception,msgId={}", msgId, e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return null;
    }
    

    这里的逻辑与ShieldTxcCommitListener的类似,对消费做幂等处理,重复消息不再入库,直接进行消费,上游回滚消费逻辑需要满足幂等性。

    我们接着看一下doUpdateAfterRollbackConsumed方法如何进行消费后置处理。

    /**
     * 拦截真实消费结果,根据消费结果更新消息状态
     *
     * @param consumeConcurrentlyStatus
     * @param baseEventService
     * @param rollbackEvent
     * @return
     */
    private ConsumeConcurrentlyStatus doUpdateAfterRollbackConsumed(BaseEventService baseEventService,
                                                                    ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
                                                                    ShieldEvent rollbackEvent) {
        if (ConsumeConcurrentlyStatus.RECONSUME_LATER == consumeConcurrentlyStatus) {
            // 消费失败,消费状态仍旧处理中
            return consumeConcurrentlyStatus;
        }
        if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus) {
            // 消费成功,处理中改完成,更新前状态:消费处理中
            rollbackEvent.setBeforeUpdateEventStatus(rollbackEvent.getEventStatus());
            // 更新后状态:消费处理中
            rollbackEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
            boolean updateBefore = baseEventService.updateEventStatusById(rollbackEvent);
            if (!updateBefore) {
                // 更新失败,幂等重试.此时必定是系统依赖组件出问题了
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    

    拦截真实回滚消费逻辑获取消费结果,如果是消费成功 [CONSUME_SUCCESS] ,更改消息状态为消费完成 [CONSUME_PROCESSED]。如果是消费失败,则会进行重试。

    上游的回滚逻辑通过重试保证成功,如果达到MQ最大重试次数(RocketMQ是16次)会进死信,此时需要人工介入进行补偿操作。

    在上线之前只要进行了充分的测试,保证业务无严重bug,确保线上MQ集群、应用集群的高可用,一般通过重试的方式都能够达成最终一致。如果出现大量数据不一致的情况,那么大概率就是应用存在bug或者MQ集群不稳定,此时需要人工介入进行排错。

    小结

    到这里我们就完成了整个框架原理图与代码实现的分析。

    实现告一段落,我们接着看一个简单的应用demo,直观感受一下ShieldTXC分布式事务框架的魅力。



    版权声明:

    原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

    文章目录
    1. 1. 第一阶段:实现回滚逻辑
    2. 2. 第二阶段:回滚拦截
      1. 2.1. 【ShieldTxcRollbackListener】
  • 小结
  • Fork me on GitHub