跟我学RocketMQ之事务消息存储源码解析
我们接着对RocketMQ的事务消息的存储阶段源码进行解析。
事务消息正式发送阶段
首先接着上文,介绍一下事务消息正式发送阶段。
在DefaultMQProducerlmpl.sendKernelImpl方法中设置消息类型为事务消息:
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
如果消息类型的确是事务消息,则设置sysFlag为事务消息标识== 0x1 << 2。方便broker对消息进行识别。
broker存储事务消息
broker端收到消息后,对消息进行处理,如果消息类型为事务半消息 (prepare消息)则执行半消息存储方法prepareMessage,否则按照普通消息进行处理(普通消息存储执行putMessage方法)。
具体逻辑如下:
[SendMessagePocessor.sendMessage]
// 解码消息发送请求头中属性为Map
Map<String, String> oriProps = MessageDecoder
.string2messageProperties(requestHeader.getProperties());
// 获取属性PROPERTY_TRANSACTION_PREPARED(TRAN_MSG)的值,这个属性在客户端进行事务消息发送时设置
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 如果traFlag不为空且true,说明是事务消息
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
// 如果broker不允许接受事务消息则响应“broker拒绝接受事务消息”,默认为允许接受
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
// 执行事务消息存储
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 执行普通消息存储
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
总结一下:
- 对请求头requestHeader的属性值解码为Map,读取其中的事务消息属性,判断是否为true
- 如果是事务消息则判断broker是否能够接受事务消息。
broker可以通过配置属性rejectTransactionMessage为true/false来决定是否能够接受事务消息请求,默认为false即允许接受事务消息。
prepareMessage
我们进入prepareMessage方法查询具体的事务消息存储逻辑。
[TransactionalMessageServiceImpl.prepareMessage]
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}
继续查看transactionalMessageBridge.putHalfMessage(messageInner);
MessageExtBrokerInner对象为将请求RemotingCommand转换后的broker端对消息的封装实体。
[TransactionalMessageBridge.putHalfMessage]
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 备份消息原主题
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
// 备份消息原队列id
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 重置sysFlag值为
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 设置topic为RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 设置队列id为0
msgInner.setQueueId(0);
// 转存消息属性Map为字符串形式
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
这里是RocektMQ对事务消息处理过程的一个巧妙之处。
RocketMQ对事务消息进行了主题更换操作,备份了原先的topic、队列id之后,将事务消息的topic统一更换为 RMQ_SYS_TRANS_HALF_TOPIC,队列id统一更换为0。
通过store.putMessage(parseHalfMessageInner(messageInner)); 对消息进行了存储,这里可以看到,对事务消息进行真正的存储的时候是按照普通消息进行的。但此时topic及队列id已经更换为事务消息的topic及队列id。
通过这个操作,使得事务消息在提交之前不会被消费者消费到。
RocektMQ会通过定时任务起线程去消费该事务topic下的消息,当消息满足提交条件,则将该消息的主题和队列id进行恢复(之前已经备份过),最终会被消息的消费者消费到,这个思路在定时消息的实现上也用到了。
我们能够发现,事务消息最终落盘其实还是按照普通消息的方式落盘,区别只是对topic和队列id进行了变换,以便该事务消息在提交之前不会被消费者消费到,借此保证消息的提交与回滚与本地事务的提交与回滚是同时成功、同时失败的。
关于消息的持久化(消息落盘)的具体过程我们在后续的消息存储源码分析中会专门说到,此处就简单的看一下,不进行展开了。
消息持久化是RocketMQ的store模块实现,具体的代码段如下:
[DefaultMessageStore.java]
PutMessageResult result = this.commitLog.putMessage(msg);
最终是通过commitLog.putMessage(msg)实现了消息的最终持久化,我们后续会详细分析。
小结
本文我们对事务消息如何发送到broker,及broker如何对事务消息进行预处理并落盘的主要过程进行了分析。
事务消息发送的第一阶段就分析完毕了;事务消息系列的下一篇文章,我们将对事务消息的第二阶段:事务消息提交/回滚以及事务消息回查过程进行解析,我们下文见。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。