跟我学RocketMQ之开源客户端混合云实践与案例解析
我们继续对消息消费流程的源码进行解析。
本文主要针对push模式下的消息拉取流程进行解析。我们重点分析集群消费模式,对于广播模式其实很好理解,每个消费者都需要拉取主题下面的所有消费队列的消息。
在集群消费模式下,同一个消费者组内包含了多个消费者实例,同一个topic下存在多个消费队列。对于单个消费者组,其内部维护了一个线程池进行消息消费,这部分内容可以移步 跟我学RocketMQ之消息消费源码解析(2)。
之前我们已经研究了消费者的初始化流程,在启动MQClientInstance过程中,启动了一个消息拉取线程PullMessageService进行消息拉取工作。
RocketMQ消费者在进行消费时,需要遵循 “订阅关系一致” 原则,关于订阅关系一致,我引用阿里云RocketMQ页面的解释,如下图:
从图中可以提炼出关键词,即:
同一个消费者组订阅的topic、topic中的tag必须保持一致,否则会出现消费不到消息的情况。
举个例子:比如我们有个topic名为DEMO_TOPIC,它有两个tag,分别为tagA、tagB。用一个消费者组demo_group分别订阅tagA、tagB,这时就会出现某个tag对应的消费者消费不到消息的情况。
解决方法就是:针对不同的tag使用不同的消费者组,在上面的案例中的解决方法为:使用demo_group_A 订阅tagA,使用demo_group_B订阅tagB。
提供了解决方案,还是有些意犹未尽,那么我们就深入RocketMQ的源码,感受一下订阅关系一致的机理。
本文我们就Netty使用中常见的半包/粘包问题进行分析和处理。
解决粘包/半包问题本质方式为:有明确的业务应用数据区分标志,能够按照边界完整的接受Netty传输的数据。
对于字符串类型的消息,Netty提供了多种现成的编解码工具解决粘包/半包问题,具体的工具类组合如下:
本文我们先介绍第一种方式,即:
DelimiterBasedFrameDecoder+StringDecoder
DelimiterBasedFrameDecoder的原理很好理解:通过利用特殊字符作为数据包的结束标志。发送方与接收方通过该标记对数据包进行分割解析即可。
本文我们接着分析RocketMQ消息消费的逻辑。
接上文,DefaultMQPushConsumerImpl启动过程中,启动了consumeMessageService消息消费线程。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
可以看到,是根据MessageListener的具体实现选择具体的consumeMessageService实现,我们重点讲解并行消费服务ConsumeMessageConcurrentlyService。
本文我们接着分析一下RocektMQ实现消息消费的源码细节,这部分的内容较多,因此拆分为几个章节分别进行讲解。
本章节重点讲解DefaultMQPushConsumer的代码逻辑。
按照惯例还是先看一下DefaultMQPushConsumer的使用样例。
@PostConstruct
public void init() {
defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
// 从头开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式:集群模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
defaultMQPushConsumer.registerMessageListener(messageListener);
// 订阅所有消息
try {
defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e);
}
LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!");
}
初始化过程中需要调用registerMessageListener将具体的消费实现Listener注入。
@Component(value = "notifySendListenerImpl")
public class NotifySendListenerImpl implements MessageListenerConcurrently {
从零学Netty系列旨在记录笔者学习Netty的过程,从入门到熟练,尽量全面的对Netty做一次深入的探秘。
本文是 “从零学Netty” 系列的第一篇,我将介绍Netty的基本概念,线程模型,以及入门案例。
Netty是一个高性能网络通信框架,它提供了异步的、事件驱动的能力,能够帮助开发者快速开发高性能、高可靠的网络服务器和客户端程序。
Netty封装了Java NIO,屏蔽了复杂的底层实现,它具有以下优势:
本文是 “自己写分布式事务框架” 系列的汇总篇,我将从原理出发,提纲挈领地对本地消息表机制进行分析,并逐步带领读者实现基于本地消息表的分布式事务框架。
分布式事务在当下分布式开发领域已经成为必须考虑的因素,随着微服务、SOA架构思想日益被开发者所熟知,分布式事务的解决思路也不断被提出并被开源社区实现。
在互联网开发中,我们常常会采用一揽子“柔性事务”解决方案来保证系统内模块之间数据的 最终一致性。
业界知名的分布式事务解决方案有如下几种:
上述方案中,我挑选了第五种 消息溯源方案(即本地消息表方案,后文均称为本地消息表方案) 作为自己写分布式事务框架的核心机制,旨在实现一个与业务无关的、基于消息的、异步确保的、最终一致的柔性事务框架。
框架的实现主要基于RocketMQ的普通消息,我们都知道RocketMQ已经支持了事务消息,这里只是基于它对本地消息表方案进行实现,原则上,本地消息表方案支持任意具有发布订阅能力的消息中间件。
上述其他实现在笔者之前的文章中有过较为系统的讲解,感兴趣的同学可以移步 置顶贴-我说分布式事务系列 进行了解学习,本系列不再展开说明。
之前的两篇文章中,我讲解了ShieldTXC的框架内核的代码实现及其原理。前前后后设计开发总共耗时一周,目前的版本基本可用,那么本文我们就使用这个版本进行一次实战演练,让读者直观的感受一下ShieldTXC实现的本地消息表分布式事务方案的魅力。
进入shieldTXC项目,可以看到项目整体的结构如下:
shieldTxc
|-txc-core ShieldTXC内核
|-txc-demo-up 分布式事务上游应用
|-txc-demo-down 分布式事务下游应用
首先对项目进行整体的编译,接着对应用txc-demo-up、txc-demo-down,修改配置文件application.properties