文章目录
  1. 1. 事务消息正式发送阶段
    1. 1.1. broker存储事务消息
      1. 1.1.1. prepareMessage
  2. 2. 小结

我们接着对RocketMQ的事务消息的存储阶段源码进行解析。

事务消息正式发送阶段

首先接着上文,介绍一下事务消息正式发送阶段。

在DefaultMQProducerlmpl.sendKernelImpl方法中设置消息类型为事务消息:

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

如果消息类型的确是事务消息,则设置sysFlag为事务消息标识== 0x1 << 2。方便broker对消息进行识别。

broker存储事务消息

broker端收到消息后,对消息进行处理,如果消息类型为事务半消息 (prepare消息)则执行半消息存储方法prepareMessage,否则按照普通消息进行处理(普通消息存储执行putMessage方法)。

具体逻辑如下:

[SendMessagePocessor.sendMessage]
// 解码消息发送请求头中属性为Map
Map<String, String> oriProps = MessageDecoder
    .string2messageProperties(requestHeader.getProperties());
// 获取属性PROPERTY_TRANSACTION_PREPARED(TRAN_MSG)的值,这个属性在客户端进行事务消息发送时设置
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 如果traFlag不为空且true,说明是事务消息

if (traFlag != null && Boolean.parseBoolean(traFlag)) {
    // 如果broker不允许接受事务消息则响应“broker拒绝接受事务消息”,默认为允许接受
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    // 执行事务消息存储
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    // 执行普通消息存储
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}   

总结一下:

  1. 对请求头requestHeader的属性值解码为Map,读取其中的事务消息属性,判断是否为true
  2. 如果是事务消息则判断broker是否能够接受事务消息。

broker可以通过配置属性rejectTransactionMessage为true/false来决定是否能够接受事务消息请求,默认为false即允许接受事务消息。

prepareMessage

我们进入prepareMessage方法查询具体的事务消息存储逻辑。

[TransactionalMessageServiceImpl.prepareMessage]
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
    return transactionalMessageBridge.putHalfMessage(messageInner);
}

继续查看transactionalMessageBridge.putHalfMessage(messageInner);

MessageExtBrokerInner对象为将请求RemotingCommand转换后的broker端对消息的封装实体。

[TransactionalMessageBridge.putHalfMessage]
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 备份消息原主题
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    // 备份消息原队列id
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    // 重置sysFlag值为
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    // 设置topic为RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    // 设置队列id为0
    msgInner.setQueueId(0);
    // 转存消息属性Map为字符串形式
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

这里是RocektMQ对事务消息处理过程的一个巧妙之处。

RocketMQ对事务消息进行了主题更换操作,备份了原先的topic、队列id之后,将事务消息的topic统一更换为 RMQ_SYS_TRANS_HALF_TOPIC,队列id统一更换为0。

通过store.putMessage(parseHalfMessageInner(messageInner)); 对消息进行了存储,这里可以看到,对事务消息进行真正的存储的时候是按照普通消息进行的。但此时topic及队列id已经更换为事务消息的topic及队列id。

通过这个操作,使得事务消息在提交之前不会被消费者消费到。

RocektMQ会通过定时任务起线程去消费该事务topic下的消息,当消息满足提交条件,则将该消息的主题和队列id进行恢复(之前已经备份过),最终会被消息的消费者消费到,这个思路在定时消息的实现上也用到了。

我们能够发现,事务消息最终落盘其实还是按照普通消息的方式落盘,区别只是对topic和队列id进行了变换,以便该事务消息在提交之前不会被消费者消费到,借此保证消息的提交与回滚与本地事务的提交与回滚是同时成功、同时失败的。

关于消息的持久化(消息落盘)的具体过程我们在后续的消息存储源码分析中会专门说到,此处就简单的看一下,不进行展开了。

消息持久化是RocketMQ的store模块实现,具体的代码段如下:

[DefaultMessageStore.java]
PutMessageResult result = this.commitLog.putMessage(msg);

最终是通过commitLog.putMessage(msg)实现了消息的最终持久化,我们后续会详细分析。

小结

本文我们对事务消息如何发送到broker,及broker如何对事务消息进行预处理并落盘的主要过程进行了分析。

事务消息发送的第一阶段就分析完毕了;事务消息系列的下一篇文章,我们将对事务消息的第二阶段:事务消息提交/回滚以及事务消息回查过程进行解析,我们下文见。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 事务消息正式发送阶段
    1. 1.1. broker存储事务消息
      1. 1.1.1. prepareMessage
  2. 2. 小结
Fork me on GitHub