文章目录
  1. 1. 框架核心原理
    1. 1.1. 框架分析之事务提交
      1. 1.1.1. 第一阶段:本地事务与消息持久化
      2. 1.1.2. 第二阶段:消息投递
        1. 1.1.2.1. 2.1定时任务初始化
        2. 1.1.2.2. 2.2定时任务核心逻辑分析
      3. 1.1.3. 第三阶段:消息消费,下游事务提交
        1. 1.1.3.1. 3.1消费适配器初始化
          1. 1.1.3.1.1. 【ShieldTxcConsumerListenerAdapter】
          2. 1.1.3.1.2. 【shieldTxcRocketMQConsumerClient】
          3. 1.1.3.1.3. 【ShieldTxcCommitListener】

从本文开始,我们正式进入自己写分布式事务框架的部分。

笔者将这个本地消息表分布式事务框架命名为 shieldTXC,意思是 神盾事务框架,框架内核及demo案例的代码已经打包上传至github上,

地址为:shieldTXC源码地址。如果觉得这个喜欢可以点个star支持下。

题外话不多说,接下来我们一边看框架的原理图,在宏观上对框架的机理做一个了解,一边对相应的原理进行代码实现讲解,这样理论与实战相结合,相信会加深读者朋友的理解。

框架核心原理

首先看下框架的核心原理图。

shieldTXC.PNG

看起来还是比较整洁的,这也是笔者写代码的一个宗旨:

好的框架可以复杂,但架构一定是优雅的、清晰的。

我们配合代码深入分析一下这张图,主要分为两个主要的阶段:

  1. 事务提交阶段
  2. 事务回滚阶段

首先来看下事务提交阶段的实现原理及相对应的代码实现。

框架分析之事务提交

图中左侧的红字标记该部分为上游应用(后续统称为上游)的事务提交阶段的运行逻辑,上游在这个阶段又分为两个子阶段。

第一阶段:本地事务与消息持久化

上游在执行本地事务成功在,在同一事务内对业务实体封装为消息体,调用ShieldTXC提供的消息持久化接口将消息持久化到业务库中,框架的消息持久化方法需要保证事务性。

代码如下:

@Transactional(rollbackFor = Exception.class)
public void testTran() {
    // 本地事务
    doLocalTransaction();
    // 消息持久化
    TestTxMessage testTxMessage = new TestTxMessage();
    testTxMessage.setName(UUID.randomUUID().toString().replace("-", "").substring(0, 10));
    shieldTxcRocketMQProducerClient
            .putMessage(testTxMessage, EventType.INSERT, TXType.COMMIT, testTxMessage.getName(),
                    UUID.randomUUID().toString());
}

这个阶段的重点在于消息持久化的实现。一起来看一下ShieldTXC是如何实现的:

/**
 * 消息持久化
 * @param shieldTxcMessage
 * @param eventType
 * @param txType
 * @param appId
 */
@Transactional(rollbackFor = Exception.class)
public void putMessage(AbstractShieldTxcMessage shieldTxcMessage,
                       EventType eventType,
                       TXType txType,
                       String appId,
                       String bizKey) {
    Preconditions.checkNotNull(eventType, "Please insert eventType, type is:[com.shield.txc.constant.EventType]");
    Preconditions.checkNotNull(bizKey, "Please insert unique bizKey!");

    ShieldEvent event = new ShieldEvent();
    event.setEventType(eventType.toString())
            .setTxType(txType.toString())
            .setEventStatus(EventStatus.PRODUCE_INIT.toString())
            .setContent(shieldTxcMessage.encode())
            .setBizKey(bizKey)
            .setAppId(appId);
    try {
        // 入库失败回滚
        boolean insertResult = this.getBaseEventService().insertEvent(event);
        if (!insertResult) {
            throw new BizException("insert ShieldEvent into DB occurred Exception!");
        }
    } catch (Exception e) {
        // 异常回滚
        throw new BizException("insert ShieldEvent into DB occurred Exception!", e);
    }
}

为了便于统一管理,ShieldTXC定义了一个抽象消息类,业务方需要继承该抽象类,通过实现其中的decode与encode方法为业务消息提供编解码能力。

public abstract class AbstractShieldTxcMessage implements Serializable {

    private static final long serialVersionUID = -2416427331208398607L;
    /**消息序列化*/
    public abstract String encode();
    /**消息反序列化*/
    public abstract void decode(String msg);
}

我们接着看上面的putMessage方法。

通过传参,组装了一个ShieldEvent持久化消息实体,并将其进行insert操作,通过判断入库是否成功决定是否抛出异常让事务进行回滚。

如果消息持久化成功,则本地事务提交;如果消息持久化失败,则本地事务回滚,业务结束。

第二阶段:消息投递

业务方完成上述第一阶段的消息持久化操作,就不需要进行其他的额外操作了。

此时框架开始执行第二阶段:消息投递阶段。

我们从图中可以清晰的看到,ShieldEvent会在内部维护一个定时任务,扫描状态为 初始化待发送 消息,组装成功消息体并调用MQ的发送消息接口,将投递到 事务提交队列。这个过程中要保证消息可达。

这个子阶段用文字描述起来比较简洁,我们接着看下框架代码是如何实现的,加深认知。

2.1定时任务初始化

整个ShieldTXC内核已经打包为一个Spring Boot的starter,因此可以通过@Enable注解简单的整合到Spring Boot应用中,框架本身提供了对核心Bean的初始化过程,这里我们主要看下扫描待发送消息的定时任务的初始化过程。

定时任务的初始化过程是在框架的ShieldEventTxcConfiguration.java类中实现的,类声明如下:

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
public class ShieldEventTxcConfiguration {

通过@Configuration标记为一个配置bean,通过@EnableConfigurationProperties(RocketMQProperties.class)开启可配置能力。ShieldTXC支持通过配置文件进行配置。

这里重点分析一下扫描待发送消息定时任务的初始化:

/**
 * 异步消息调度构造
 * @param rocketMQProperties
 * @return
 */
@Bean
@ConditionalOnMissingBean
@Order(value = 3)
public SendTxcMessageScheduler sendTxcMessageScheduler(RocketMQProperties rocketMQProperties) {
    SendTxcMessageScheduler sendTxcMessageScheduler = new SendTxcMessageScheduler();
    // 设置调度线程池参数
    sendTxcMessageScheduler.setInitialDelay(rocketMQProperties.getTranMessageSendInitialDelay());
    sendTxcMessageScheduler.setPeriod(rocketMQProperties.getTranMessageSendPeriod());
    sendTxcMessageScheduler.setCorePoolSize(rocketMQProperties.getTranMessageSendCorePoolSize());
    // 数据库操作
    sendTxcMessageScheduler.setBaseEventService(baseEventService(baseEventRepository()));
    // 消息发送
    sendTxcMessageScheduler.setShieldTxcRocketMQProducerClient(rocketMQEventProducerClient(rocketMQProperties));
    LOGGER.debug("Initializing [sendTxcMessageScheduler] instance success.");
    // 执行调度
    sendTxcMessageScheduler.schedule();
    return sendTxcMessageScheduler;
}

通过@Bean标记为Spring容器中的bean,Spring在初始化过程中会加载我们的bean,bean的name就是方法名,这个方式是Spring提供的通过JavaConfig方式进行bean定义的方式,这里不做展开。

我们首先初始化了一个SendTxcMessageScheduler实例,对其进行参数的配置,诸如线程池参数、数据库操作相关的bean依赖(通过相同方式进行初始化的)、消息发送的bean依赖(通过相同方式进行初始化的)。依赖设置完成后调用schedule()方法开启任务调度。当应用启动完成之后便会自动开始进行待发送消息的扫描操作。

关于数据库操作bean、RocketMQ操作的bean初始化过程,感兴趣的读者可以去源码的com.shield.txc.configuration.ShieldEventTxcConfiguration.java类中查看,请恕本文不再展开。

2.2定时任务核心逻辑分析

初始化完成SendTxcMessageScheduler之后,具体又是如何对待发送消息进行处理的呢?带着疑问,我们深入SendTxcMessageScheduler这个调度类中一探究竟。

SendTxcMessageScheduler方法的声明如下:

public class SendTxcMessageScheduler extends AbstractMessageScheduler implements Runnable {

可以看到SendTxcMessageScheduler实例本身也是一个Runnable实例
,这里暂且放一下,后续的分析中会用到,我们接着往下看。

上述的代码中在初始化完成SendTxcMessageScheduler实例后,调用了schedule()方法,那么我们重点看下这个方法。

public void schedule() {
    executorService.scheduleAtFixedRate(
            this,
            this.initialDelay,
            this.period,
            this.timeUnit);
}

代码逻辑很简单,通过内部的executorService开启了调度流程,executorService的初始化是在构造方法中完成的,默认构造方法如下:

public SendTxcMessageScheduler() {
    executorService = Executors.newScheduledThreadPool(corePoolSize);
}

在schedule()方法中,调用scheduleAtFixedRate方法,第一个参数传入了this引用,线程池会定时执行this引用(也是一个Runnable引用)的run方法,方法逻辑:

@Override
public void run() {
    // 查询并发送消息
    try {
        // 获取待调度的消息,初始态==初始化
        List<ShieldEvent> shieldEvents = baseEventService.queryEventListByStatus(EventStatus.PRODUCE_INIT.toString());
        if (CollectionUtils.isEmpty(shieldEvents)) {
            return;
        }
        for (ShieldEvent shieldEvent : shieldEvents) {
            // 发送前改状态
            processBeforeSendMessage(shieldEvent);
            // 发送消息核心逻辑
            sendMessage(shieldEvent);
            // 判断发送结果,成功则更新为已发送
            processAfterSendMessage(shieldEvent);
        }
    } catch (Exception e) {
        LOGGER.error("Sending rollback message occurred Exception!", e);
        return;
    }
}

这里主要采用了模板方法对业务逻辑进行了封装。

首先获取待调度的消息,状态为 PRODUCE_INIT (生产初始化)。默认获取50条。

如果未查询到消息,则结束本次调度,对于查询到的消息列表shieldEvents进行迭代:

1.首先进行发送前置操作,即方法processBeforeSendMessage。将消息的状态从 PRODUCE_INIT 改为 PRODUCE_PROCESSING (生产处理中)。这么做的目的在于通过状态机方式的乐观锁达到支持并发的目的。这个思路在日常业务开发中也经常用到。

2.接着进行发送核心操作,这里需要看一下代码是如何实现的

/**
* 发送事务消息
* @param shieldEvent
*/
@Override
public void sendMessage(ShieldEvent shieldEvent) {
    int eventId = shieldEvent.getId();
    // 组装Message
    ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
    shieldTxcMessage
            .setId(String.valueOf(eventId))
            .setAppId(shieldEvent.getAppId())
            .setContent(shieldEvent.getContent())
            .setEventType(shieldEvent.getEventType())
            .setEventStatus(shieldEvent.getEventStatus())
            .setTxType(shieldEvent.getTxType())
            .setBizKey(shieldEvent.getBizKey());

    String messgeBody = shieldTxcMessage.encode();
    String topic = null;
    BizResult bizResult = null;
    // 发送commit消息,判断消息类型
    if (TXType.COMMIT.toString().equals(shieldTxcMessage.getTxType())) {
        topic = MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_COMMMIT_STAGE,
                shieldTxcRocketMQProducerClient.getTopic());
        Message commitMessage = new Message(topic, messgeBody.getBytes());
        bizResult = shieldTxcRocketMQProducerClient.sendCommitMsg(commitMessage, eventId);
    }
    // 发送rollback消息
    if (TXType.ROLLBACK.toString().equals(shieldTxcMessage.getTxType())) {
        topic = MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_ROLLBACK_STAGE,
                shieldTxcRocketMQProducerClient.getTopic());
        Message rollbackMessage = new Message(topic, messgeBody.getBytes());
        bizResult = shieldTxcRocketMQProducerClient.sendRollbackMsg(rollbackMessage, eventId);
    }
    if (bizResult.getBizCode() != BizCode.SEND_MESSAGE_SUCC) {
        LOGGER.debug("[SendTxcMessageScheduler] Send ShieldTxc Message result:[FAIL], Message Body:[{}]", messgeBody);
        return;
    }
    LOGGER.debug("[SendTxcMessageScheduler] Send ShieldTxc Message result:[SUCCESS], Message Body:[{}]", messgeBody);
}

首先将消息实体转换为ShieldTxcMessage。ShieldTxcMessage是框架封装的消息类型,内部的encode、decode方法提供了序列化、反序列化能力。

接着判断消息类型,如果是 TXType.COMMIT 则将消息投递到事务提交队列;如果是 TXType.ROLLBACK 则将消息投递到事务回滚队列。

在上游业务中发送的消息均为事务提交消息,因此会被投递到事务提交队列中。

最后判断消息发送结果,如果发送失败会进行重试,该重试能力是MQ中间件客户端提供的。对于多次发送都失败的消息需要更改状态为初始化,继续进行投递,保证消息一定能发出去。长时间发布出去的消息进发送失败表,后续需要对该失败表进行扫描,触发本地业务回滚。这种情况因为极其少见,因此框架当前版本暂未实现,后续更新后在github的页面中注明。

3.消息投递成功后,调用processAfterSendMessage方法进行状态更新,将处理中状态 PRODUCE_PROCESSING 改为 PRODUCE_PROCESSED (生产处理成功)。

到此就完成了分布式事务上游的消息发布流程。

第三阶段:消息消费,下游事务提交

为了方便读者对照,这里再贴一下原理图。我们接着分析事务下游应用(后文称下游)是如何对事务提交队列中的消息进行消费从而完成本地事务的。

shieldTXC.PNG

3.1消费适配器初始化

下游应用启动过程中需要完成消费适配器的初始化,实现对事务提交队列消息的消费。该消费逻辑实现了下游与上游的最终一致性。

@Service
public class TxConsumeService implements InitializingBean {

    @Value("${shield.event.rocketmq.nameSrvAddr}")
    String nameSerAddr;

    @Value("${shield.event.rocketmq.topicSource}")
    String topic;

    @Override
    public void afterPropertiesSet() throws Exception {

        new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcCommitListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("测试消费ShieldTxcCommitListener开始......");

                Random ra =new Random();
                int randomInt = ra.nextInt(10) + 1;
                if (randomInt <= 5) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        }));
    }
}

上面这段代码为下游应用需要实现的,在应用启动阶段初始化了ShieldTxcConsumerListenerAdapter消费适配器。

【ShieldTxcConsumerListenerAdapter】

ShieldTxcConsumerListenerAdapter的主要构造方法有三个,分别为:

事务下游可选构造方法,即分布式事务的最终执行者

/**
 * 事务下游可选
 * @param nameSrvAddr
 * @param topic
 * @param txCommmtListener
 */
public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
                                        String topic,
                                        ShieldTxcCommitListener txCommmtListener) {
    this.nameSrvAddr = nameSrvAddr;
    this.topic = topic;
    this.txCommmtListener = txCommmtListener;
    init();
}

事务上游可选构造方法,即分布式事务的最初发起者

/**
 * 事务上游可选
 * @param nameSrvAddr
 * @param topic
 * @param txRollbackListener
 */
public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
                                        String topic,
                                        ShieldTxcRollbackListener txRollbackListener) {
    this.nameSrvAddr = nameSrvAddr;
    this.topic = topic;
    this.txRollbackListener = txRollbackListener;
    init();
}

事务上下游可选构造方法,即一个应用处于上游和下游之间,它既是该分布式事务的上游又是该分布式事务的下游

public ShieldTxcConsumerListenerAdapter(String nameSrvAddr,
                                        String topic,
                                        ShieldTxcCommitListener txCommmtListener,
                                        ShieldTxcRollbackListener txRollbackListener) {
    this.nameSrvAddr = nameSrvAddr;
    this.topic = topic;
    this.txCommmtListener = txCommmtListener;
    this.txRollbackListener = txRollbackListener;
    init();
}

上述的demo代码所在的应用为分布式事务的最终下游,因此只需要实现ShieldTxcCommitListener接口,完成consumeMessage回调方法。我们在其中模拟百分之五十提交,百分之五十重试的情况,测试正常与异常情况下框架的表现。

在ShieldTxcConsumerListenerAdapter构造方法中均调用了init()方法,该方法初始化了真正的消费者客户端,对MQ进行监听。

public ShieldTxcConsumerListenerAdapter init() {
    // 初始化shieldTxcRocketMQConsumerClient
    Preconditions.checkNotNull(this.nameSrvAddr, "please insert RocketMQ NameServer address");
    shieldTxcRocketMQConsumerClient =
            new ShieldTxcRocketMQConsumerClient(this.topic, this.nameSrvAddr, this.getTxCommmtListener(), this.getTxRollbackListener());
    LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient] instance init success.");
    return this;
}
【shieldTxcRocketMQConsumerClient】

我们进入shieldTxcRocketMQConsumerClient类中,观察一下它是如何进行初始化的。

public ShieldTxcRocketMQConsumerClient(String topic,
                                       String nameSrvAddr,
                                       ShieldTxcCommitListener txCommtListener,
                                       ShieldTxcRollbackListener txRollbackListener) {
    this.nameSrvAddr = nameSrvAddr;
    this.topic = topic;
    if (txCommtListener == null && txRollbackListener == null) {
        throw new BizException("Please define at least one MessageListenerConcurrently instance, such as [ShieldTxcCommitListener] or [ShieldTxcRollbackListener] or both.");
    }
    if (txCommtListener != null) {
        // 初始化事务提交消费者
        initCommitConsumer(this.topic, this.nameSrvAddr, txCommtListener);
        LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient.CommmitConsumer] instance init success.");
    }
    if (txRollbackListener != null) {
        // 初始化事务回滚消费者
        initRollbackConsumer(this.topic, this.nameSrvAddr, txRollbackListener);
        LOGGER.debug("Initializing [ShieldTxcRocketMQConsumerClient.RollbackListener] instance init success.");
    }
}

上述代码是ShieldTxcRocketMQConsumerClient初始化流程,可以看到主要是判断了ShieldTxcCommitListener、ShieldTxcRollbackListener实例是否为空,如果不为空则进行对应消费者的初始化。两个初始化流程基本相同,我们重点看下initCommitConsumer方法的逻辑。

/**
 * 初始化事务提交消费者
 * @param topic
 * @param nameSrvAddr
 */
private void initCommitConsumer(String topic, String nameSrvAddr, ShieldTxcCommitListener txCommtListener) {
    commitConsumer =
            new DefaultMQPushConsumer(
                    MessagePropertyBuilder.groupId(CommonProperty.TRANSACTION_COMMMIT_STAGE, topic));
    commitConsumer.setNamesrvAddr(nameSrvAddr);
    // 从头开始消费
    commitConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式:集群模式
    commitConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册监听器
    commitConsumer.registerMessageListener(txCommtListener);
    // 订阅所有消息
    try {
        commitConsumer.subscribe(
                MessagePropertyBuilder.topic(CommonProperty.TRANSACTION_COMMMIT_STAGE, topic), "*");
        // 启动消费者
        commitConsumer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("Loading [com.shield.txc.RocketMQEventConsumerClient.commmitConsumer] occurred exception", e);
    }
}

相信不需要我再做多余的解释了,这里其实就是启动了一个DefaultMQPushConsumer消费者客户端,并对对应的Topic进行订阅,从而实现对对应队列中消息的消费。如果读者对RocketMQ不是很了解,可以到RocketMQ开发者中心进行相关的学习。RocketMQ中文开发者中心

回到原理图中,可以看到下游应用的逻辑中有一个 ShieldTXC Commit拦截器,这个拦截器才是消费阶段的重点,框架通过该拦截器对消费过程进行了代理,加入了前置后置操作,从而保证了消费阶段的分布式事务的一致性。

拦截器代码在ShieldTxcCommitListener中。

【ShieldTxcCommitListener】

ShieldTxcCommitListener的类声明及构造方法如下:

public class ShieldTxcCommitListener implements MessageListenerConcurrently {

    public ShieldTxcCommitListener(MessageListenerConcurrently txCommmtListener) {
        this.txCommmtListener = txCommmtListener;
    }

可以看到ShieldTxcCommitListener实现了MessageListenerConcurrently接口。当ShieldTxcCommitListener在构造过程中将外界传入的MessageListenerConcurrently实例的引用指向了内部的MessageListenerConcurrently引用。

ShieldTxcCommitListener本身也是MessageListenerConcurrently实例,通过它的consumeMessage代理了外部传入的MessageListenerConcurrently实例,通过加入了切面逻辑对消费过程做了进一步的处理。

这里就对ShieldTxcCommitListener如何对真实的消费过程进行代理做深入的分析。

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

    for (MessageExt msg : msgs) {
        String msgBody = new String(msg.getBody());
        String msgId = msg.getMsgId();
        LOGGER.debug("[ShieldTxcCommitListener]Consuming [COMMIT] Message start... msgId={},msgBody={}", msgId, msgBody);

首先进行前置参数的获取,这里不需要过多解释。

ShieldTxcMessage shieldTxcMessage = new ShieldTxcMessage();
shieldTxcMessage.decode(msgBody);

ShieldEvent event = new ShieldEvent();
event.convert(shieldTxcMessage);

BaseEventService baseEventService =
        (BaseEventService) SpringApplicationHolder.getBean("baseEventService");

将消息体进行反序列化,并转换为ShieldEvent消息实体便于后续操作;通过Spring上下文获取到数据库操作bean,便于进行消息状态修改。

try {
    // 消费幂等,查询消息是否存在,入库带唯一索引
    // 消费次数大于等于阈值,回滚事务
    int currReconsumeTimes = msg.getReconsumeTimes();
    if (currReconsumeTimes >= CommonProperty.MAX_COMMIT_RECONSUME_TIMES) {
        // 事务回滚操作,消息复制为回滚生产者,持久化
        LOGGER.debug("[ShieldTxcCommitListener] START transaction rollback sequence! msgId={},currReconsumeTimes={}", msgId, currReconsumeTimes);
        if (doPutRollbackMsgAfterMaxConsumeTimes(baseEventService, event, msgId)) {
            LOGGER.debug("[ShieldTxcCommitListener] transaction rollback sequence executed SUCCESS! msgId={}", msgId);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } else {
            // 如果一直失败最后会进死信
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }

这里获取了消息的当前消费次数,并与最大消费次数进行比对。

如果消费次数已经超过最大消费次数(默认为10次),则进行分布式事务的回滚操作。这么做的原因为:下游无论如何都没办法提交本地事务,如果不回滚则上下数据一致性就被破坏了,因此这里通过doPutRollbackMsgAfterMaxConsumeTimes进行了如下的业务处理:

  1. 将消息的状态由 CONSUME_PROCESSING (消费处理中,该状态在首次消费开始时就由CONSUME_INIT修改得到)改为 CONSUME_MAX_RECONSUMETIMES (达到最大消费次数)。
  2. 克隆消息,插入事务回滚消息,状态为 PRODUCE_INIT
  3. 完成后等待事务下游的SendTxcMessageScheduler对回滚消息进行投递。
String bizKey = shieldTxcMessage.getBizKey();
String txType = shieldTxcMessage.getTxType();
String eventStatua = shieldTxcMessage.getEventStatus();
String appId = shieldTxcMessage.getAppId();
String eventType = shieldTxcMessage.getEventType();

// 进行消息持久化
event.setEventType(eventType)
        .setTxType(txType)
        .setEventStatus(EventStatus.CONSUME_INIT.toString())
        .setContent(shieldTxcMessage.getContent())
        .setAppId(shieldTxcMessage.getAppId())
        .setBizKey(bizKey)
        .setId(Integer.valueOf(shieldTxcMessage.getId()));
// 入库失败回滚
boolean insertResult = baseEventService.insertEventWithId(event);
if (!insertResult) {
    LOGGER.warn("[ShieldTxcCommitListener] insert shieldEvent Consume Message failed,msgId={}", msgId);
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

如果当前没有达到最大消费次数则将消息进行持久化操作,这里在数据库中对消息的id与业务唯一键bizkey加了唯一索引,如果重复插入会报唯一约束异常,便不会重复插入,保证了消息的唯一性。

// 改消费处理中
doUpdateMessageStatusProcessing(baseEventService, event);
// 真实消费
return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);

消息入库后将消息状态改为 CONSUME_PROCESSING (消费处理中),接着进行真实的消费过程,通过拦截判断真实的消费结果对消息状态进行对应的修改。

        } catch (Exception e) {
            // 幂等处理:唯一约束触发则直接进行消费
            if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_HAS_EXISTED_INDEX) >= 0) {
                LOGGER.debug("[ShieldTxcCommitListener::UNIQUE INDEX], message has existed,msgId={}", msgId);
                return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
            }
            if (e.getMessage() != null && e.getMessage().indexOf(CommonProperty.MESSAGE_PRIMARY_KEY_DUPLICATE) >= 0) {
                LOGGER.debug("[ShieldTxcCommitListener::Duplicate entry for key 'PRIMARY'], message has existed,msgId={}", msgId);
                return doUpdateAfterConsumed(baseEventService, this.txCommmtListener.consumeMessage(msgs, context), event);
            }
            // 其他异常重试
            LOGGER.warn("ShieldTxcCommitListener Consume Message occurred Exception,msgId={}", msgId, e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return null;
}

这里为对消息重复插入的处理流程,如果消息重复入库,会抛出异常并被捕获,则直接进行真实消息消费流程,通过方法doUpdateAfterConsumed实现。

/**
 * 拦截真实消费结果,根据消费结果更新消息状态
 *
 * @param consumeConcurrentlyStatus
 * @param baseEventService
 * @param shieldEvent
 * @return
 */
private ConsumeConcurrentlyStatus doUpdateAfterConsumed(BaseEventService baseEventService,
                                                        ConsumeConcurrentlyStatus consumeConcurrentlyStatus,
                                                        ShieldEvent shieldEvent) {
    LOGGER.debug("[ShieldTxcCommitListener::doUpdateAfterConsumed] The Real ConsumeConcurrentlyStatus is : [{}]", consumeConcurrentlyStatus);
    if (ConsumeConcurrentlyStatus.RECONSUME_LATER.name().equals(consumeConcurrentlyStatus.name())) {
        // 消费失败,消费状态仍旧处理中
        return consumeConcurrentlyStatus;
    }
    if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name().equals(consumeConcurrentlyStatus.name())) {
        // 消费成功,处理中改完成,更新前状态:消费处理中
        shieldEvent.setBeforeUpdateEventStatus(shieldEvent.getEventStatus());
        // 更新后状态:消费完成
        shieldEvent.setEventStatus(EventStatus.CONSUME_PROCESSED.toString());
        boolean updateBefore = baseEventService.updateEventStatusById(shieldEvent);
        if (!updateBefore) {
            // 更新失败,幂等重试.此时必定是系统依赖组件出问题了
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

调用doUpdateAfterConsumed时传入的参数ConsumeConcurrentlyStatus代表业务层返回的消费状态。该状态只有两个值:CONSUME_SUCCESS/RECONSUME_LATER。

如果返回RECONSUME_LATER,表明事务提交消息消费失败,后续会继续进行重试。消息表中消息的状态仍旧为 CONSUME_PROCESSING,我们不做多余的处理。

如果返回CONSUME_SUCCESS,表明事务提交消息消费成功,则将数据库中的这条消息状态改为 CONSUME_PROCESSED。如果修改不成功,则进行重试即可,业务侧的消费逻辑要注意保证消费幂等。

整个消费过程如果达到重试次数上限仍旧不能成功,则会触发全局事务的回滚操作,这就保证了整个分布式事务的闭环。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 框架核心原理
    1. 1.1. 框架分析之事务提交
      1. 1.1.1. 第一阶段:本地事务与消息持久化
      2. 1.1.2. 第二阶段:消息投递
        1. 1.1.2.1. 2.1定时任务初始化
        2. 1.1.2.2. 2.2定时任务核心逻辑分析
      3. 1.1.3. 第三阶段:消息消费,下游事务提交
        1. 1.1.3.1. 3.1消费适配器初始化
          1. 1.1.3.1.1. 【ShieldTxcConsumerListenerAdapter】
          2. 1.1.3.1.2. 【shieldTxcRocketMQConsumerClient】
          3. 1.1.3.1.3. 【ShieldTxcCommitListener】
Fork me on GitHub