跟我学RocektMQ之事务消息提交及回查源码解析
本文进入了事务消息源码解析的最后部分,该部分是RocketMQ二阶段事务的第二阶段,即:提交/回滚事务以及事务回查。
消息提交/回滚[客户端逻辑]:endTransaction
在事务消息源码解析的第一篇末尾,我们分析了DefaultMQProducerImpl.endTransaction的逻辑:
[DefaultMQProducerImpl.endTransaction]
......
String transactionId = sendResult.getTransactionId();
// 获取broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
// 设置生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 设置队列offset
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
// 设置消息id
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ?
("executeLocalTransactionBranch exception: " + localException.toString())
: null;
// 发送事务结束请求
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
代码的主要逻辑概括如下:
- 根据消息丛书的消息队列获取broker的ip及端口信息
- 构造事务结束请求头,设置事务id以及commit的offset
- 根据本地事务实际的执行状况,为事务结束请求头设置 提交/回滚/什么都不做 标记
- 通过当前客户端持有的MQClientInstance发送事务结束请求到broker
消息提交/回滚[服务端逻辑]:EndTransactionProcessor.processRequest
broker在启动时会加载EndTransactionProcessor,处理客户端发送的事务结束请求;调用链如下:
BrokerStartup.main(String[] args)
|-BrokerStartup.start(BrokerController controller)
|-BrokerStartup.createBrokerController(String[] args) 返回BrokerController
|-BrokerController.initialize()
|-BrokerController.registerProcessor()
|-RemotingServer.registerProcessor(
final int requestCode, (RequestCode.END_TRANSACTION,)
final NettyRequestProcessor processor, (new EndTransactionProcessor(this))
final ExecutorService executor); (this.endTransactionExecutor)
EndTransactionProcessor的核心逻辑如下:
[EndTransactionProcessor.processRequest]
OperationResult result = new OperationResult();
// 如果MessageSysFlag为事务提交类型
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 执行消息提交
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 结束消息事务
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
RemotingCommand sendResult = sendFinalMessage(msgInner);
// 删除半消息
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
}
// 如果MessageSysFlag为事务回滚类型
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 执行消息回滚
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 删除半消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
// 组装返回体
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
总结一下EndTransactionProcessor逻辑:
- 判断MessageSysFlag,如果MessageSysFlag为事务提交类型,则执行事务提交操作
- 执行TransactionalMessageService.commitMessage进行半消息提交操作
- 执行endMessageTransaction,恢复消息原主题,原队列,恢复原消息
- 重新对消息进行持久化,存储到commitLog中,执行sendFinalMessage将原消息转发至实际的消息消费队列中,以便消费者进行消费
- 执行deletePrepareMessage方法删除prepare消息,内部实现为将prepare消息转储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中;标识该消息已被处理,为事务回查提供依据
- 如果MessageSysFlag为事务回滚类型,则执行事务回滚操作
- 执行TransactionalMessageService.rollbackMessage进行半消息回滚操作
- 通过deletePrepareMessage将prepare半消息进行删除,实现方式与事务提交相同,将prepare消息转储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中,标识该消息已被处理。
我们具体分析一下这个过程中涉及到的子流程:
prepare消息提交:commitMessage
首先是commitMessage,执行事务提交操作
[TransactionalMessageServiceImpl.commitMessage]
@Override
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}
private OperationResult getHalfMessageByOffset(long commitLogOffset) {
OperationResult response = new OperationResult();
// 根据消息的物理偏移commitLogOffset获取消息MessageExt
MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
// 将消息设置到OperationResult返回体中
if (messageExt != null) {
response.setPrepareMessage(messageExt);
response.setResponseCode(ResponseCode.SUCCESS);
}
...省略messageExt为空的逻辑...
return response;
}
endMessageTransaction,恢复消息原主题,原队列,恢复原消息
[EndTransactionProcessor.endMessageTransaction()]
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
// 初始化新的消息实体MessageExtBrokerInner
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
// 从属性中恢复消息的原topic
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
// 从属性中恢复消息的原队列id
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
// 复制消息体,消息属性
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
msgInner.setWaitStoreMsgOK(false);
msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
msgInner.setSysFlag(msgExt.getSysFlag());
TopicFilterType topicFilterType =
(msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
: TopicFilterType.SINGLE_TAG;
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
return msgInner;
}
sendFinalMessage,将原消息转发至实际的消息消费队列中,以便消费者进行消费
[EndTransactionProcessor.sendFinalMessage]
final PutMessageResult putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
...省略后续校验逻辑...
最终通过DefaultMessageStore.putMessage将恢复后的原消息再次持久化到commitLog中。
deletePrepareMessage,删除prepare消息,非物理删除
[TransactionalMessageServiceImpl.deletePrepareMessage]
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}",
msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true;
} else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}",
msgExt.getMsgId(), msgExt.getQueueId());
return false;
}
}
可以看到是通过transactionalMessageBridge.putOpMessage实现的逻辑删除
[TransactionalMessageBridge.putOpMessage]
public boolean putOpMessage(MessageExt messageExt, String opType) {
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
// 将该prepare消息存储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}
通过addRemoveTagInTransactionOp将prepare消息存储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题
[TransactionalMessageBridge.addRemoveTagInTransactionOp]
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
Message message = new Message(
TransactionalMessageUtil.buildOpTopic(),
TransactionalMessageUtil.REMOVETAG,
String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
writeOp(message, messageQueue);
return true;
}
可以看到最终是通过writeOp实现的消息转储
private void writeOp(Message message, MessageQueue mq) {
MessageQueue opQueue;
...省略opQueue校验过程...
if (opQueue == null) {
opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
}
putMessage(makeOpMessageInner(message, opQueue));
}
通过putMessage将prepare消息持久化到commiLog。topic为RMQ_SYS_TRANS_OP_HALF_TOPIC;我们看一下topic的创建方法TransactionalMessageUtil.buildOpTopic()
[TransactionalMessageUtil.buildOpTopic]
public static String buildOpTopic() {
return MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC;
}
这里的写法可以参考,即通过一个静态方法将常量进行封装。到此我们对prepare消息提交的分析就告一段落。
最终半消息的删除已依靠文件删除机制实现的。
prepare消息回滚:rollbackMessage
接着看一下prepare消息的回滚逻辑。
首先执行TransactionalMessageService.rollbackMessage进行半消息回滚操作
[TransactionalMessageServiceImpl.rollbackMessage]
@Override
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}
这里的逻辑同commitMessage相同,同样是根据消息的物理偏移commitLogOffset获取消息MessageExt;获取到消息之后执行deletePrepareMessage将prepare消息删除。实现方式与事务提交相同,这部分代码上文已经分析过就不再重复。
事务消息回查逻辑
我们最后分析一下二阶段中重要的一个流程,事务回查的实现。
事务回查service初始化
事务回查实现是通过线程TransactionalMessageCheckService实现的,它的初始化调用链如下:
BrokerStartup.main(String[] args)
|-BrokerStartup.start(BrokerController controller)
|-BrokerStartup.createBrokerController(String[] args) 返回BrokerController
|-BrokerController.initialize()
|-BrokerController.initialTransaction();
|-this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
RocketMQ的broker关键的异步逻辑基本上都是通过该调用链实现的,包括但不限于上文中提到的事务提交/回滚处理器EndTransactionProcessor的注册过程。
在broker启动完成之后事务回查线程TransactionalMessageCheckService也随之加载完毕。
事务回查逻辑
我们查看一下回查线程TransactionalMessageCheckService的run方法,核心的回查逻辑就在该方法中
[TransactionalMessageCheckService.java]
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
checkInterval为回查任务的间隔时间,默认为60秒,
[BrokerConfig.java]
@ImportantField
private long transactionCheckInterval = 60 * 1000;
checkInterval的值可通过在broker.conf文件中配置transactionChecklnterval来改变,单位为毫秒。
我们继续进入 waitForRunning方法中
[TransactionalMessageCheckService.java]
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
......
}
进入onWaitEnd方法中;TransactionalMessageCheckService重写了父类ServiceThread的onWaitEnd方法,我们分析一下具体逻辑
[TransactionalMessageCheckService.java]
@Override
protected void onWaitEnd() {
// TransactionTimeOut默认值为5秒
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 回查最大次数为15次;
// 如果超过最大检测次数还是无法获得事务状态,RocketMQ将直接丢弃该消息即相当于回滚事务。
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 回查逻辑
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
这里我们可以得出,事务回查操作周期默认为60s一次,每次执行的超时时间为5秒;最大回查次数为15次,超过最大回查次数则丢弃消息,相当有对事务进行了回滚。
回查逻辑TransactionalMessageService.check
[TransactionalMessageServiceImpl.check]
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
上述逻辑首先获取了RMQ_SYS_TRANS_HALF_TOPIC半消息中的所有队列。
......
// 迭代队列
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取半消息队列的消费偏移量
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 获取op队列已经删除消费队列的偏移量
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
// 确认消息是否删除
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
根据方法fillOpRemoveMap确认半消息是否已经被删除,具体逻辑如下:
[TransactionalMessageServiceImpl.fillOpRemoveMap]
......
List<MessageExt> opMsg = pullResult.getMsgFoundList();
if (opMsg == null) {
log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
return pullResult;
}
for (MessageExt opMessageExt : opMsg) {
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
if (queueOffset < miniOffset) {
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
} else {
log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
}
}
return pullResult;
比较半消息消费队列中的最大偏移量miniOffset 与 删除消费队列的消息偏移量queueOffset;
如果queueOffset >= miniOffset,说明半消息已经删除过了,但是半消息还没有更新,将半消息存放在removeMap中。
我们继续回到check方法中
while (true) {
......
// 如果半消息已经被处理过,偏移量继续递增,往后处理
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
removeMap.remove(i);
} else {
// 查找半消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
...省略半消息不存在处理逻辑...
// 如果超过存储时间needSkip(默认3天)或者 超过回查次数needDiscard(默认15次)
// 继续往后执行
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
// 消息存储时间大于开始时间的不处理
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
// 如果存储时间小于需要进行回查的时间,跳过继续下一个
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
}
这段代码的逻辑总结如下:
- 判断removeMap中包含该消息,表明消息已经被处理过,只是半消息队列未更新;跳过这个消息不进行处理
- 如果消息回查次数大于15次或者消息已经超过了存储时间则不对消息进行处理
消息存储时间大于目前的回查程序开始时间的暂时不处理,等待后续进行回查
List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { // 将半消息重新存储在topic--RMQ_SYS_TRANS_HALF_TOPIC中 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 发给客户端选择一个producerGroup进行回查 listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; } // 保存op消费进度 if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); }
这段逻辑对本次回查未能获取结果的消息重新存储到RMQ_SYS_TRANS_HALF_TOPIC中,等待下次回查的执行;否则发给客户端进行回查,最终保存已处理消息与半消息的消费进度。
客户端进行事务回查
客户端ClientRemotingProcessor通过checkTransactionState方法响应事务回查请求
[ClientRemotingProcessor.java]
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
// 请求头解码
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
messageExt.setTopic(NamespaceUtil
.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
}
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
// 选择一个事务消息生产者实例
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 执行事务回查
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
...省略异常日志打印...
return null;
}
客户单选择一个生产者实例发起真正的事务回查操作,通过producer.checkTransactionState(addr, messageExt, requestHeader)执行回查
回查是在客户端中起线程异步执行的,通过异步回调客户端TransactionListener的checkLocalTransactionState方法实现。
// 构造一个回查任务
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
首先获取客户端设置的事务监听器
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
通过transactionCheckListener.checkLocalTransactionState(message);执行回查操作
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
将回查得到的本地事务执行结果发送给broker,以便broker对半消息进行回滚/提交等操作
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
通过processTransactionState方法将事务回查的结果提交给broker
private void processTransactionState(
// 本地事务执行状态
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
构造事务结束请求实体EndTransactionRequestHeader,根据回查得到的本地事务执行结果,设置具体的消息执行状态MessageSysFlag。这里与半消息发送阶段对本地事务的处理是一致的。
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
try {
通过endTransactionOneway将事务回查状态发送给broker,具体的逻辑与本文开头一致。
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
实际上回查的请求就是通过客户端设置的回查线程池提交的,这句代码可见端倪。
// 提交回查请求到事务回查线程池
this.checkExecutor.submit(request);
可以看到,回查任务最终提交到了TransactionMQProducer的事务回查线程池中执行,最终调用了应用程序实现的
TransactionListener 的checkLoca!Transaction 方法,根据执行结果返回真实的事务状态。
小结
我们最后再总结一下RocketMQ事务消息的实现思想:
RocketMQ的事务消息是基于两阶段提交思想,并配合事务状态回查机制实现的。
两阶段提交部分:首先发送prepare半消息,根据本地事务执行的提交或者回滚发送半消息commit/rollback命令给broker;
broker端通过定时任务,默认以1分钟为回查频率,对Prepare消息存储队列(topic=RMQ__SYS_TRANS _HALF_TOPIC)及半消息处理队列(topic=RMQ_SYS_TRANS_OP_HALF_TOPIC存储已经提交或者回滚的消息)中的消息进行比较,对需要进行回查的prepare消息发送给客户端进行回查;根据回查结果最终决定对半消息进行commit/rollback操作。
到此事务消息的提交/回滚及回查的解析就告一段落,事务消息部分的源码解析就到此结束。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。