跟我学RocketMQ之事务消息发送源码解析
接下来我将通过一个小系列对RocketMQ事务消息进行一次较为全面的源码解析,本文主要对事务消息发送进行重点分析。
RocketMQ从4.3.0版本重新开源了事务消息,通过基于两阶段提交方式+定时回查机制,为分布式事务问题提供了新的解决方案。
事务消息流程概述
为了便于读者朋友理解,我们先对事务消息的流程进行概述。
- 应用程序事务发起者发送事务半消息到MQ的broker端,发送成功后,服务端会同步回调事务监听器的本地事务执行方法执行本地事务
- RocketMQ的broker收到半消息后,先对消息的topic与消费队列id进行备份,然后存储到主题为 RMQ_SYS_TRANS_HALF_TOPIC 的队列中
- broker端开启一个定时任务,取出RMQ_SYS_TRANS_HALF_TOPIC中的消息向消息的发送者发起回查。发送端根据本地事务的具体执行状态返回 提交/回滚/事务未知 状态
- 如果返回 提交/回滚 则broker对事务消息进行提交或者回滚,如果返回了未知,则等待下次继续进行回查。消息的回查间隔与回查次数是可以进行配置的
- 对于达到回查次数依旧无法获取事务状态的消息,broker会对该事务消息做回滚操作
通过上述的步骤,能够保证事务消息发起者的本地事务与消息发送同时成功,同时失败。即:
当事务发起者本地事务提交,消息才会提交并能够被事务消息消费者消费到。如果事务发起者本地事务回滚,则事务消息也会回滚。保证了事务消息的发送与事务消息发送方本地事务是原子的。
而回查机制保证了对于异常/未知情况,能够最大努力得保证消息发送与事务消息发送方本地事务的原子性。
对于事务消息的消费者,只需要通过重试机制就能够与事务消息发送方达到分布式事务的最终一致性。因此事务消息本质上也属于 柔性事务 的一种具体实现方式。
事务消息发送
首先看一段事务消息发送的代码样例,取自官方demo。
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService =
new ThreadPoolExecutor(2, 5,
100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
可以看到涉及到的主要类有:
- TransactionMQProducer – 事务消息生产者,主要实现事务消息发送
- TransactionListener – 事务监听器,主要实现本地事务执行及事务状态回查
TransactionMQProducer初始化
TransactionMQProducer类声明如下:
public class TransactionMQProducer extends DefaultMQProducer {
可以看到TransactionMQProducer继承自DefaultMQProducer,我们之前解析的DefaultMQProducer的特性,TransactionMQProducer都有。
上面的demo中,通过
producer.setExecutorService(executorService);
设置了线程池executorService,它的作用在于提供异步执行事务状态回查能力。
通过下面的代码将本地事务执行逻辑、回查逻辑的实现设置给了TransactionMQProducer。
producer.setTransactionListener(transactionListener);
通过 producer.start(); 就启动了事务消息发送者的实例。start()方法与DefaultMQProducer的start()方法有所区别:
TransactionMQProducer.start()
[TransactionMQProducer.java]
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
在真正启动客户端实例之前,通过initTransactionEnv()方法进行了事务消息相关的配置,具体逻辑如下
[DefaultMQProducerImpl.java]
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}
这里对事务消息回查线程池checkExecutor进行了赋值。如果调用者设置了自定义的checkExecutor线程池实现,则使用客户端的实例;否则实例化一个ThreadPoolExecutor,设置回查任务阻塞队列大小为checkRequestHoldMax,默认值为2000。
事务消息发送的方法为 sendMessageInTransaction,我们从这里入手开始进行解析。
sendMessageInTransaction
[TransactionMQProducer.java]
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
// 执行事务消息发送
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
在进行事务消息发送之前,校验transactionListener是否为null,如果为null则抛出异常。否则调用defaultMQProducerImpl的sendMessageInTransaction执行事务消息发送。
我们对defaultMQProducerImpl的sendMessageInTransaction方法做详细的分析。
defaultMQProducerImpl.sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter,
final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
校验transactionListener或者localTransactionExecuter实例是否存在,如果两者都为null,则抛出异常。
// 校验Message,对topic、body是否为null及body长度进行校验
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// prepare消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 设置生产者组。用于回查本地事务,从生产者组中选择随机选择一个生产者即可
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
这里对消息添加事务消息属性, 添加PROPERTY_TRANSACTION_PREPARED(值为:TRAN_MSG)=true标识消息为事务消息;
添加PROPERTY_PRODUCER_GROUP(值为:PGROUP)=调用方设置的生产者组,代表事务消息所属消息生产者组。
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
发送事务消息,如果异常则进行抛出。具体的消息发送过程在之前的源码解析文章中已经有所涉及,这里就不再展开。读者可以前往 跟我学RocketMQ之消息发送源码解析 自行回顾。
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 根据消息发送结果中的sendStatus属性选择对应的处理逻辑
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
// 设置事务消息id
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
// 回调localTransactionExecuter执行本地事务
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
// 回调transactionListener的executeLocalTransaction执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
// 本地事务执行状态未知
localTransactionState = LocalTransactionState.UNKNOW;
}
// 对于本地事务执行状态非 COMMIT_MESSAGE的情况进行日志打印
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
// 对非SEND_OK的情况,回滚事务消息。
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
根据消息发送结果执行对应的处理逻辑。
如果消息发送状态为 SEND_OK,则同步回调localTransactionExecuter执行本地事务或者回调transactionListener的executeLocalTransaction执行本地事务。这里推荐后者,源码中明确标识,通过localTransactionExecuter执行本地事务的方式将在RocketMQ5.0.0移除。
如果本地事务结果localTransactionState返回null,则localTransactionState设置为UNKNOW。
对于本地事务执行失败的情况,设置localTransactionState为ROLLBACK_MESSAGE,MQ broker会对localTransactionState==ROLLBACK_MESSAGE的消息进行删除处理。
通过这段逻辑就保证了本地事务与事务消息状态的一致性。这段代码我认为是事务消息中较为关键的一段代码。
try {
// 结束事务
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
...省略后续处理...
endTransaction
这里通过endTransaction方法结束事务操作,我们进入endTransaction方法中查看如何对事务进行结束处理。
[DefaultMQProducerImpl.endTransaction]
......
String transactionId = sendResult.getTransactionId();
// 获取broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
根据localTransactionState的具体类型为requestHeader设置消息提交或者回滚状态。
- 如果localTransactionState==COMMIT_MESSAGE,设置为MessageSysFlag.TRANSACTION_COMMIT_TYPE
- 如果localTransactionState==ROLLBACK_MESSAGE,设置为MessageSysFlag.TRANSACTION_ROLLBACK_TYPE
- 如果localTransactionState==UNKNOW,设置为MessageSysFlag.TRANSACTION_NOT_TYPE
小结
本文主要对事务消息的发送主流程进行了解析,接下来我将对事务消息存储相关的逻辑进行分析。我们下篇文章再见。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。