跟我学RocketMQ[1-5]之发送事务消息实现分布式事务及封装TransactionMQProducer支持spring
这里先说点儿私货,为什么在众多的MQ中,我尤其看好RocketMQ,主要的原因之一就是它的事务消息。
关于RocketMQ的事务消息的机制,我在之前的一篇文章中已经做过讲解,文章地址 我说分布式事务之消息一致性事务2-rocketmq的实现
RocketMQ的事务消息是目前主流的MQ中唯一支持分布式事务的解决方案,也是众多最终一致性方案中较为成熟的。
本文中,我将带领大家了解并运用TransactionMQProducer实现事务消息的发送,并对TransactionMQProducer进行基于Spring的封装,使我们的项目更方便的集成事务消息能力。
其中,SpringBoot支持的版本已经上传至github,代码地址为RocketMQTransactionProducerAgent.java
发送事务消息
创建消息生产者
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
可以看到,事务消息的主要发送过程和普通消息的基本一致,区别在于需要通过 producer.setExecutorService(executorService); 为 TransactionMQProducer设置本地事务执行的线程池,同时需要事务消息生产者TransactionMQProducer设置本地事务执行监听器TransactionListener,这里我通过定义一个TransactionListener的实现类的方式来进行代码的编写,更利于理解,熟练之后可以直接在消息发送的时候通过使用匿名内部类或者lambda表达式的方式直接实现TransactionListener接口。
实现TransactionListener接口
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
需要实现TransactionListener的两个回调方法。
实现executeLocalTransaction(本地事务逻辑)方法。消息生产者需要在executeLocalTransaction中执行本地事务当事务半消息提交成功,执行完毕后需要返回事务状态码。
实现checkLocalTransaction(事务回查逻辑)方法,该方法用于进行本地事务执行情况回查,并回应事务状态给MQ的broker,执行完成之后需要返回对应的事务状态码。
到这里就完成了一个事务消息的发送者。
封装TransactionMQProducer整合spring
理解了事务消息的发送机制,我们还是基于 Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE 对TransactionMQProducer进行薄封装。
直接上代码。
首先声明RocketMQTransactionProducerAgent并标注为一个spring的bean,作用域为多例。
@Component
@Scope("prototype")
public class RocketMQTransactionProducerAgent {
声明事务消息发送者,事务执行监听器。
private TransactionMQProducer producer;
private TransactionListener transactionListener;
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQTransactionProducerAgent.class);
init()方法中进行消息生产者的初始化,在构造方法中声明一个事务消息生产者组,然后设置NameServer的地址。外部需要定义好一个本地事务执行的线程池(ExecutorService)并将实例化后的线程池引用设置给TransactionMQProducer。
同时,外界需要实现TransactionListener接口,并将该接口实现类的引用设置给TransactionMQProducer。
public RocketMQTransactionProducerAgent init(RocketMQTransactionProducerConfig transactionProducerConfig,
TransactionListener transactionListener) throws Exception {
producer = new TransactionMQProducer(transactionProducerConfig.getProducerGroup());
producer.setNamesrvAddr(transactionProducerConfig.getNameSrvAddr());
// 设置本地事务执行线程池
producer.setExecutorService(transactionProducerConfig.getExecutorService());
this.transactionListener = transactionListener;
// 设置本地事务执行监听器
producer.setTransactionListener(this.transactionListener);
LOGGER.debug("com.shield.job.message.rocketmq.RocketMQTransactionProducerAgent[初始化完成]");
return this;
}
封装后的启动方法
/**
* 启动消费者服务
*/
public void start() throws MQClientException {
this.producer.start();
}
封装后的关闭方法
public void destroy() {
this.producer.shutdown();
LOGGER.debug("com.shield.job.message.rocketmq.RocketMQTransactionProducerAgent[已关闭]");
}
为方便调用方对消息生产者TransactionMQProducer的定制,提供接口返回TransactionMQProducer实例。
public TransactionMQProducer getProducer() {
return this.producer;
}
}
RocketMQTransactionProducerAgent使用案例
按照惯例,我们还是基于文章开头的示例进行改造,代码如下。
@Component
public class DemoTransactionPublisher {
private static final Logger LOGGER =
LoggerFactory.getLogger(DemoTransactionPublisher.class);
注入RocketMQTransactionProducerAgent事务消息发送端
@Resource(name = "rocketMQTransactionProducerAgent")
RocketMQTransactionProducerAgent transactionProducerAgent;
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
声明本地事务执行线程池
ExecutorService executorService;
定义返回值为void的方法,标注为@PostConstruct以便spring容器加载过程中对事务消息发送客户端进行初始化操作。
@PostConstruct
void init() throws Exception {
初始化本地事务执行线程池,这里尽量使用ThreadPoolExecutor初始化线程池,更加灵活。
executorService =
new ThreadPoolExecutor(
5,
512,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024));
调用transactionProducerAgent的init(RocketMQTransactionProducerConfig transactionProducerConfig, TransactionListener transactionListener) 方法,通过RocketMQTransactionProducerConfig设置事务消息发送组,NameServer地址,并将实现定义好的线程池引用传递进去。
transactionProducerAgent.init(new RocketMQTransactionProducerConfig(
"CID_SNOWALKER_TRANSACTION",
"172.30.83.100:9876",
executorService),
实现本地事务执行监听器,实现其中的本地事务执行方法及回查方法。
new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
LOGGER.info("返回本地事务执行结果为:LocalTransactionState.UNKNOW");
System.out.println("222222222222222222222");
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
System.out.println("11111111111111111111111");
LOGGER.info("返回本地事务执行结果为:LocalTransactionState.COMMIT_MESSAGE");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
调用start()方法启动事务发送端
).start();
这里的逻辑是发送半消息的逻辑,发送方需要根据自己的业务定义事务半消息体,保证本地事务执行成功后,能够将必须的参数传递给消息消费者从而保证本地事务执行与半消息发送同时成功、同时失败。
当发送方的本地事务提交,确保半消息一定能够推送给消费者一侧。消费者通过消息重试机制,保证两侧的状态达到最终一致。如果消费者不断的消费,达到最大重试次数,还是不能够消费成功,则消息进入死信队列,人工干预。
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = transactionProducerAgent.getProducer().sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
}
}
小结
本文的核心还是在于讲解如何使用RocketMQ的API完成事务消息的发送,以及如何对其进行封装,使开发更加便捷。
只要理解了RocketMQ事务消息机理,相信聪明的你会很容易上手代码开发,所以这里我还是建议学习一门新技术,不要仅仅满足于会调用API,关键还是要理解原理,举一反三,相信你一定可以以不变应万变。
PS: RocketMQ事务消息再复盘
RocketMQ的事务消息基于两阶段提交和事务状态回查机制来实现,所谓的两阶段提交,即首先发送Prepare消息,待事务提交或回滚时发送commit、rollback命令。
再结合定时任务,RocketMQ使用专门的线程以特定的频率对RocketMQ服务器上的Prepare消息进行处理,向发送端查询事务消息的状态来决定是否提交或者回滚消息。 –引自《RocketMQ技术内幕 8.5》子曰:不愤不启,不悱不发。举一隅不以三隅反,则不复也。