文章目录
  1. 1. 消息提交/回滚[客户端逻辑]:endTransaction
  2. 2. 消息提交/回滚[服务端逻辑]:EndTransactionProcessor.processRequest
    1. 2.1. prepare消息提交:commitMessage
    2. 2.2. prepare消息回滚:rollbackMessage
  3. 3. 事务消息回查逻辑
    1. 3.1. 事务回查service初始化
    2. 3.2. 事务回查逻辑
      1. 3.2.1. 回查逻辑TransactionalMessageService.check
      2. 3.2.2. 客户端进行事务回查
  4. 4. 小结

本文进入了事务消息源码解析的最后部分,该部分是RocketMQ二阶段事务的第二阶段,即:提交/回滚事务以及事务回查。

消息提交/回滚[客户端逻辑]:endTransaction

在事务消息源码解析的第一篇末尾,我们分析了DefaultMQProducerImpl.endTransaction的逻辑:

[DefaultMQProducerImpl.endTransaction]
......
String transactionId = sendResult.getTransactionId();
// 获取broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
    case COMMIT_MESSAGE:
        requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
        break;
    case ROLLBACK_MESSAGE:
        requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
        break;
    case UNKNOW:
        requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
        break;
    default:
        break;
}
// 设置生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 设置队列offset
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
// 设置消息id
requestHeader.setMsgId(sendResult.getMsgId());

String remark = localException != null ? 
        ("executeLocalTransactionBranch exception: " + localException.toString())
        : null;
// 发送事务结束请求
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());

代码的主要逻辑概括如下:

  1. 根据消息丛书的消息队列获取broker的ip及端口信息
  2. 构造事务结束请求头,设置事务id以及commit的offset
  3. 根据本地事务实际的执行状况,为事务结束请求头设置 提交/回滚/什么都不做 标记
  4. 通过当前客户端持有的MQClientInstance发送事务结束请求到broker

消息提交/回滚[服务端逻辑]:EndTransactionProcessor.processRequest

broker在启动时会加载EndTransactionProcessor,处理客户端发送的事务结束请求;调用链如下:

BrokerStartup.main(String[] args)
    |-BrokerStartup.start(BrokerController controller)
    |-BrokerStartup.createBrokerController(String[] args) 返回BrokerController
        |-BrokerController.initialize()
            |-BrokerController.registerProcessor()
                |-RemotingServer.registerProcessor(
                        final int requestCode,                   (RequestCode.END_TRANSACTION,)
                        final NettyRequestProcessor processor,   (new EndTransactionProcessor(this))
                        final ExecutorService executor);         (this.endTransactionExecutor)

EndTransactionProcessor的核心逻辑如下:

[EndTransactionProcessor.processRequest]
OperationResult result = new OperationResult();
// 如果MessageSysFlag为事务提交类型
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 执行消息提交
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 结束消息事务
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            // 删除半消息
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
} 
// 如果MessageSysFlag为事务回滚类型
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    // 执行消息回滚
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 删除半消息
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}
// 组装返回体
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;

总结一下EndTransactionProcessor逻辑:

  1. 判断MessageSysFlag,如果MessageSysFlag为事务提交类型,则执行事务提交操作
    1. 执行TransactionalMessageService.commitMessage进行半消息提交操作
    2. 执行endMessageTransaction,恢复消息原主题,原队列,恢复原消息
    3. 重新对消息进行持久化,存储到commitLog中,执行sendFinalMessage将原消息转发至实际的消息消费队列中,以便消费者进行消费
    4. 执行deletePrepareMessage方法删除prepare消息,内部实现为将prepare消息转储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中;标识该消息已被处理,为事务回查提供依据
  2. 如果MessageSysFlag为事务回滚类型,则执行事务回滚操作
    1. 执行TransactionalMessageService.rollbackMessage进行半消息回滚操作
    2. 通过deletePrepareMessage将prepare半消息进行删除,实现方式与事务提交相同,将prepare消息转储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中,标识该消息已被处理。

我们具体分析一下这个过程中涉及到的子流程:

prepare消息提交:commitMessage

首先是commitMessage,执行事务提交操作

[TransactionalMessageServiceImpl.commitMessage]
@Override
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
    return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}

private OperationResult getHalfMessageByOffset(long commitLogOffset) {
    OperationResult response = new OperationResult();
    // 根据消息的物理偏移commitLogOffset获取消息MessageExt
    MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
    // 将消息设置到OperationResult返回体中
    if (messageExt != null) {
        response.setPrepareMessage(messageExt);
        response.setResponseCode(ResponseCode.SUCCESS);
    } 
    ...省略messageExt为空的逻辑...
    return response;
}

endMessageTransaction,恢复消息原主题,原队列,恢复原消息

[EndTransactionProcessor.endMessageTransaction()]
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
    // 初始化新的消息实体MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    // 从属性中恢复消息的原topic
    msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    // 从属性中恢复消息的原队列id
    msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    // 复制消息体,消息属性
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(msgExt.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
    msgInner.setWaitStoreMsgOK(false);
    msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    msgInner.setSysFlag(msgExt.getSysFlag());
    TopicFilterType topicFilterType =
        (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
            : TopicFilterType.SINGLE_TAG;
    long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
    msgInner.setTagsCode(tagsCodeValue);
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
    return msgInner;
}

sendFinalMessage,将原消息转发至实际的消息消费队列中,以便消费者进行消费

[EndTransactionProcessor.sendFinalMessage]
final PutMessageResult putMessageResult =
        this.brokerController.getMessageStore().putMessage(msgInner);
...省略后续校验逻辑...

最终通过DefaultMessageStore.putMessage将恢复后的原消息再次持久化到commitLog中。

deletePrepareMessage,删除prepare消息,非物理删除

[TransactionalMessageServiceImpl.deletePrepareMessage]
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
    if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
        log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", 
        msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
        return true;
    } else {
        log.error("Transaction op message write failed. messageId is {}, queueId is {}", 
        msgExt.getMsgId(), msgExt.getQueueId());
        return false;
    }
}

可以看到是通过transactionalMessageBridge.putOpMessage实现的逻辑删除

[TransactionalMessageBridge.putOpMessage]
public boolean putOpMessage(MessageExt messageExt, String opType) {
    MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
        this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
    if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
        // 将该prepare消息存储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中
        return addRemoveTagInTransactionOp(messageExt, messageQueue);
    }
    return true;
}

通过addRemoveTagInTransactionOp将prepare消息存储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题

[TransactionalMessageBridge.addRemoveTagInTransactionOp]
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
    Message message = new Message(
        TransactionalMessageUtil.buildOpTopic(), 
        TransactionalMessageUtil.REMOVETAG,
        String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
    writeOp(message, messageQueue);
    return true;
}

可以看到最终是通过writeOp实现的消息转储

private void writeOp(Message message, MessageQueue mq) {
    MessageQueue opQueue;
    ...省略opQueue校验过程...
    if (opQueue == null) {
        opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
    }
    putMessage(makeOpMessageInner(message, opQueue));
}

通过putMessage将prepare消息持久化到commiLog。topic为RMQ_SYS_TRANS_OP_HALF_TOPIC;我们看一下topic的创建方法TransactionalMessageUtil.buildOpTopic()

[TransactionalMessageUtil.buildOpTopic]
public static String buildOpTopic() {
    return MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC;
}

这里的写法可以参考,即通过一个静态方法将常量进行封装。到此我们对prepare消息提交的分析就告一段落。

最终半消息的删除已依靠文件删除机制实现的。

prepare消息回滚:rollbackMessage

接着看一下prepare消息的回滚逻辑。

首先执行TransactionalMessageService.rollbackMessage进行半消息回滚操作

[TransactionalMessageServiceImpl.rollbackMessage]
@Override
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
    return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}

这里的逻辑同commitMessage相同,同样是根据消息的物理偏移commitLogOffset获取消息MessageExt;获取到消息之后执行deletePrepareMessage将prepare消息删除。实现方式与事务提交相同,这部分代码上文已经分析过就不再重复。

事务消息回查逻辑

我们最后分析一下二阶段中重要的一个流程,事务回查的实现。

事务回查service初始化

事务回查实现是通过线程TransactionalMessageCheckService实现的,它的初始化调用链如下:

BrokerStartup.main(String[] args)
    |-BrokerStartup.start(BrokerController controller)
    |-BrokerStartup.createBrokerController(String[] args) 返回BrokerController
        |-BrokerController.initialize()
            |-BrokerController.initialTransaction();
                |-this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);

RocketMQ的broker关键的异步逻辑基本上都是通过该调用链实现的,包括但不限于上文中提到的事务提交/回滚处理器EndTransactionProcessor的注册过程。

在broker启动完成之后事务回查线程TransactionalMessageCheckService也随之加载完毕。

事务回查逻辑

我们查看一下回查线程TransactionalMessageCheckService的run方法,核心的回查逻辑就在该方法中

[TransactionalMessageCheckService.java]
@Override
public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

checkInterval为回查任务的间隔时间,默认为60秒,

[BrokerConfig.java]
@ImportantField
private long transactionCheckInterval = 60 * 1000;

checkInterval的值可通过在broker.conf文件中配置transactionChecklnterval来改变,单位为毫秒。

我们继续进入 waitForRunning方法中

[TransactionalMessageCheckService.java]
protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd();
        return;
    }
    ......
}

进入onWaitEnd方法中;TransactionalMessageCheckService重写了父类ServiceThread的onWaitEnd方法,我们分析一下具体逻辑

[TransactionalMessageCheckService.java]
@Override
protected void onWaitEnd() {
    // TransactionTimeOut默认值为5秒
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 回查最大次数为15次;
    // 如果超过最大检测次数还是无法获得事务状态,RocketMQ将直接丢弃该消息即相当于回滚事务。
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // 回查逻辑
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

这里我们可以得出,事务回查操作周期默认为60s一次,每次执行的超时时间为5秒;最大回查次数为15次,超过最大回查次数则丢弃消息,相当有对事务进行了回滚。

回查逻辑TransactionalMessageService.check

[TransactionalMessageServiceImpl.check]
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
    log.warn("The queue of topic is empty :" + topic);
    return;
}

上述逻辑首先获取了RMQ_SYS_TRANS_HALF_TOPIC半消息中的所有队列。

......
// 迭代队列
for (MessageQueue messageQueue : msgQueues) {
    long startTime = System.currentTimeMillis();
    MessageQueue opQueue = getOpQueue(messageQueue);
    // 获取半消息队列的消费偏移量
    long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
    // 获取op队列已经删除消费队列的偏移量
    long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
    log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
    if (halfOffset < 0 || opOffset < 0) {
        log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
            halfOffset, opOffset);
        continue;
    }

    List<Long> doneOpOffset = new ArrayList<>();
    HashMap<Long, Long> removeMap = new HashMap<>();

    // 确认消息是否删除
    PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
    if (null == pullResult) {
        log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
            messageQueue, halfOffset, opOffset);
        continue;
    }

根据方法fillOpRemoveMap确认半消息是否已经被删除,具体逻辑如下:

[TransactionalMessageServiceImpl.fillOpRemoveMap]
......
List<MessageExt> opMsg = pullResult.getMsgFoundList();
if (opMsg == null) {
    log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
    return pullResult;
}
for (MessageExt opMessageExt : opMsg) {
    Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
    if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
        if (queueOffset < miniOffset) {
            doneOpOffset.add(opMessageExt.getQueueOffset());
        } else {
            removeMap.put(queueOffset, opMessageExt.getQueueOffset());
        }
    } else {
        log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
    }
}
return pullResult;

比较半消息消费队列中的最大偏移量miniOffset 与 删除消费队列的消息偏移量queueOffset;

如果queueOffset >= miniOffset,说明半消息已经删除过了,但是半消息还没有更新,将半消息存放在removeMap中。

我们继续回到check方法中

while (true) {
        ......
    // 如果半消息已经被处理过,偏移量继续递增,往后处理
    if (removeMap.containsKey(i)) {
        log.info("Half offset {} has been committed/rolled back", i);
        removeMap.remove(i);
    } else {
        // 查找半消息
        GetResult getResult = getHalfMsg(messageQueue, i);
        MessageExt msgExt = getResult.getMsg();
        ...省略半消息不存在处理逻辑...
        // 如果超过存储时间needSkip(默认3天)或者 超过回查次数needDiscard(默认15次)
        // 继续往后执行
        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
            listener.resolveDiscardMsg(msgExt);
            newOffset = i + 1;
            i++;
            continue;
        }
        // 消息存储时间大于开始时间的不处理
        if (msgExt.getStoreTimestamp() >= startTime) {
            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                new Date(msgExt.getStoreTimestamp()));
            break;
        }

        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
        long checkImmunityTime = transactionTimeout;
        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
        if (null != checkImmunityTimeStr) {
            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
            // 如果存储时间小于需要进行回查的时间,跳过继续下一个
            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                    newOffset = i + 1;
                    i++;
                    continue;
                }
            }
        } 

这段代码的逻辑总结如下:

  1. 判断removeMap中包含该消息,表明消息已经被处理过,只是半消息队列未更新;跳过这个消息不进行处理
  2. 如果消息回查次数大于15次或者消息已经超过了存储时间则不对消息进行处理
  3. 消息存储时间大于目前的回查程序开始时间的暂时不处理,等待后续进行回查

            List<MessageExt> opMsg = pullResult.getMsgFoundList();
            boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                || (valueOfCurrentMinusBorn <= -1);
    
            if (isNeedCheck) {
                // 将半消息重新存储在topic--RMQ_SYS_TRANS_HALF_TOPIC中
                if (!putBackHalfMsgQueue(msgExt, i)) {
                    continue;
                }
                // 发给客户端选择一个producerGroup进行回查
                listener.resolveHalfMsg(msgExt);
            } else {
                pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                    messageQueue, pullResult);
                continue;
            }
        }
        newOffset = i + 1;
        i++;
    }
    // 保存op消费进度
    if (newOffset != halfOffset) {
        transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
    }
    

这段逻辑对本次回查未能获取结果的消息重新存储到RMQ_SYS_TRANS_HALF_TOPIC中,等待下次回查的执行;否则发给客户端进行回查,最终保存已处理消息与半消息的消费进度。

客户端进行事务回查

客户端ClientRemotingProcessor通过checkTransactionState方法响应事务回查请求

[ClientRemotingProcessor.java]
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    // 请求头解码
    final CheckTransactionStateRequestHeader requestHeader =
        (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
    final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
    final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
    if (messageExt != null) {
        if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
            messageExt.setTopic(NamespaceUtil
                .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
        }
        String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (null != transactionId && !"".equals(transactionId)) {
            messageExt.setTransactionId(transactionId);
        }
        final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        if (group != null) {
            // 选择一个事务消息生产者实例
            MQProducerInner producer = this.mqClientFactory.selectProducer(group);
            if (producer != null) {
                final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                // 执行事务回查
                producer.checkTransactionState(addr, messageExt, requestHeader);
            } else {
                log.debug("checkTransactionState, pick producer by group[{}] failed", group);
            }
        ...省略异常日志打印...
    return null;
}

客户单选择一个生产者实例发起真正的事务回查操作,通过producer.checkTransactionState(addr, messageExt, requestHeader)执行回查

回查是在客户端中起线程异步执行的,通过异步回调客户端TransactionListener的checkLocalTransactionState方法实现。

// 构造一个回查任务
Runnable request = new Runnable() {
    private final String brokerAddr = addr;
    private final MessageExt message = msg;
    private final CheckTransactionStateRequestHeader checkRequestHeader = header;
    private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

    @Override
    public void run() {

首先获取客户端设置的事务监听器

TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable exception = null;
    try {
        if (transactionCheckListener != null) {

通过transactionCheckListener.checkLocalTransactionState(message);执行回查操作

        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
    } else if (transactionListener != null) {
        log.debug("Used new check API in transaction message");
        localTransactionState = transactionListener.checkLocalTransaction(message);
    } else {
        log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
    }
} catch (Throwable e) {
    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
    exception = e;
}

将回查得到的本地事务执行结果发送给broker,以便broker对半消息进行回滚/提交等操作

        this.processTransactionState(
            localTransactionState,
            group,
            exception);
    } else {
        log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
    }
}

通过processTransactionState方法将事务回查的结果提交给broker

private void processTransactionState(
    // 本地事务执行状态
    final LocalTransactionState localTransactionState,
    final String producerGroup,
    final Throwable exception) {

    final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
    thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
    thisHeader.setProducerGroup(producerGroup);
    thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
    thisHeader.setFromTransactionCheck(true);

    String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
    if (uniqueKey == null) {
        uniqueKey = message.getMsgId();
    }
    thisHeader.setMsgId(uniqueKey);
    thisHeader.setTransactionId(checkRequestHeader.getTransactionId());

构造事务结束请求实体EndTransactionRequestHeader,根据回查得到的本地事务执行结果,设置具体的消息执行状态MessageSysFlag。这里与半消息发送阶段对本地事务的处理是一致的。

switch (localTransactionState) {
    case COMMIT_MESSAGE:
        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
        break;
    case ROLLBACK_MESSAGE:
        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
        break;
    case UNKNOW:
        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
        break;
    default:
        break;
}

String remark = null;
if (exception != null) {
    remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}

try {

通过endTransactionOneway将事务回查状态发送给broker,具体的逻辑与本文开头一致。

            DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                3000);
        } catch (Exception e) {
            log.error("endTransactionOneway exception", e);
        }
    }
};

实际上回查的请求就是通过客户端设置的回查线程池提交的,这句代码可见端倪。

// 提交回查请求到事务回查线程池
this.checkExecutor.submit(request);

可以看到,回查任务最终提交到了TransactionMQProducer的事务回查线程池中执行,最终调用了应用程序实现的
TransactionListener 的checkLoca!Transaction 方法,根据执行结果返回真实的事务状态。

小结

我们最后再总结一下RocketMQ事务消息的实现思想:

RocketMQ的事务消息是基于两阶段提交思想,并配合事务状态回查机制实现的。

两阶段提交部分:首先发送prepare半消息,根据本地事务执行的提交或者回滚发送半消息commit/rollback命令给broker;

broker端通过定时任务,默认以1分钟为回查频率,对Prepare消息存储队列(topic=RMQ__SYS_TRANS _HALF_TOPIC)及半消息处理队列(topic=RMQ_SYS_TRANS_OP_HALF_TOPIC存储已经提交或者回滚的消息)中的消息进行比较,对需要进行回查的prepare消息发送给客户端进行回查;根据回查结果最终决定对半消息进行commit/rollback操作。

到此事务消息的提交/回滚及回查的解析就告一段落,事务消息部分的源码解析就到此结束。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 消息提交/回滚[客户端逻辑]:endTransaction
  2. 2. 消息提交/回滚[服务端逻辑]:EndTransactionProcessor.processRequest
    1. 2.1. prepare消息提交:commitMessage
    2. 2.2. prepare消息回滚:rollbackMessage
  3. 3. 事务消息回查逻辑
    1. 3.1. 事务回查service初始化
    2. 3.2. 事务回查逻辑
      1. 3.2.1. 回查逻辑TransactionalMessageService.check
      2. 3.2.2. 客户端进行事务回查
  4. 4. 小结
Fork me on GitHub