跟我学RocketMQ之消息消费源码解析(1)
本文我们接着分析一下RocektMQ实现消息消费的源码细节,这部分的内容较多,因此拆分为几个章节分别进行讲解。
本章节重点讲解DefaultMQPushConsumer的代码逻辑。
DefaultMQPushConsumer使用样例
按照惯例还是先看一下DefaultMQPushConsumer的使用样例。
@PostConstruct
public void init() {
defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
// 从头开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式:集群模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
defaultMQPushConsumer.registerMessageListener(messageListener);
// 订阅所有消息
try {
defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e);
}
LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!");
}
初始化过程中需要调用registerMessageListener将具体的消费实现Listener注入。
@Component(value = "notifySendListenerImpl")
public class NotifySendListenerImpl implements MessageListenerConcurrently {
...省略部分代码...
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
// 消息解码
String message = new String(msg.getBody());
// 消费次数
int reconsumeTimes = msg.getReconsumeTimes();
String msgId = msg.getMsgId();
String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;
LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-接收到消息,message={},{}", message, logSuffix);
// 请求组装
OrderResultNofityProtocol protocol = new OrderResultNofityProtocol();
protocol.decode(message);
// 参数加签,获取用户privatekey
String privateKey = protocol.getPrivateKey();
String notifyUrl = protocol.getMerchantNotifyUrl();
String purseId = protocol.getPurseId();
ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest();
chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId())
.setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis())))
.setOrder_status(NotifyConstant.NOTIFY_SUCCESS)
.setPlat_orderid(protocol.getOrderId())
.setSign(chargeNotifyRequest.sign(privateKey));
LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-订单结果通知入参:{},{}", chargeNotifyRequest.toString(), logSuffix);
// 通知发送
return sendNotifyByPost(reconsumeTimes, logSuffix, protocol, notifyUrl, purseId, chargeNotifyRequest);
}
} catch (Exception e) {
LOGGER.error("[通知发送消息消费者]消费异常,e={}", LogExceptionWapper.getStackTrace(e));
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
上面就是一个较为标准的在spring框架中使用RocektMQ的DefaultMQPushConsumer进行消费的主流程。
接下来我们重点分析一下源码实现。
初始化DefaultMQPushConsumer
首先看一下DefaultMQPushConsumer的初始化过程。
进入DefaultMQPushConsumer.java类,查看构造方法:
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
调用了它的同名构造,采用AllocateMessageQueueAveragely策略(平均散列队列算法
)
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
可以看到实际初始化是通过DefaultMQPushConsumerImpl实现的,DefaultMQPushConsumer持有一个defaultMQPushConsumerImpl的引用。
[DefaultMQPushConsumerImpl.java]
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
// 初始化DefaultMQPushConsumerImpl,将defaultMQPushConsumer的实际引用传入
this.defaultMQPushConsumer = defaultMQPushConsumer;
// 传入rpcHook并指向本类的引用
this.rpcHook = rpcHook;
}
注册消费监听MessageListener
我们接着看一下注册消费监听器的流程。
消费监听接口MessageListener有两个具体的实现,分别为
MessageListenerConcurrently -- 并行消费监听
MessageListenerOrderly -- 顺序消费监听
本文以MessageListenerConcurrently为主要讲解的对象。
查看MessageListenerConcurrently的注册过程。
@Override
public void registerMessageListener(
MessageListenerConcurrently messageListener) {
// 将实现指向本类引用
this.messageListener = messageListener;
// 进行真实注册
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
接着看defaultMQPushConsumerImpl.registerMessageListener
DefaultMQPushConsumerImpl.java
public void registerMessageListener(MessageListener messageListener) {
this.messageListenerInner = messageListener;
}
可以看到DefaultMQPushConsumerImpl将真实的messageListener实现指向它本类的messageListener引用。
订阅topic
接着看一下订阅topic的主流程。
topic订阅主要通过方法subscribe实现,首先看一下DefaultMQPushConsumer的subscribe实现
@Override
public void subscribe(String topic, String subExpression)
throws MQClientException {
this.defaultMQPushConsumerImpl
.subscribe(withNamespace(topic), subExpression);
}
可以看到是调用了DefaultMQPushConsumerImpl的subscribe方法。
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
// 构建主题的订阅数据,默认为集群消费
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
// 将topic的订阅数据进行保存
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
// 如果MQClientInstance不为空,则向所有的broker发送心跳包,加锁
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
看一下buildSubscriptionData代码逻辑
[FilterAPI.java]
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
// 构造一个SubscriptionData实体,设置topic、表达式(tag)
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
// 如果tag为空或者为"*",统一设置为"*",即订阅所有消息
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
// tag不为空,则先按照‘|’进行分割
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
// 遍历tag表达式数组
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
// 将每个tag的值设置到tagSet中
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
// tag解析异常
throw new Exception("subString split error");
}
}
return subscriptionData;
}
看一下sendHeartbeatToAllBrokerWithLock代码逻辑
[MQClientInstance.java]
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 发送心跳包
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}
可以看到,同步发送心跳包给所有的broker,而该过程是通过RemotingClient统一实现的,通过调用RemotingClient.invokeSync实现心跳包的发送,底层是通过Netty实现的。具体细节本文不进行展开。
启动消费客户端
上述初始化流程执行完毕之后,通过start()方法启动消费客户端。
@Override
public void start() throws MQClientException {
// 设置消费者组
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// 启动消费客户端
this.defaultMQPushConsumerImpl.start();
// trace处理逻辑
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
关于trace的处理逻辑,本文不再展开,感兴趣的同学可以移步 跟我学RocketMQ之消息轨迹实战与源码分析
接着看defaultMQPushConsumerImpl.start()方法逻辑
[DefaultMQPushConsumerImpl.java]
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={},
isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
首次启动后,执行配置检查,该方法为前置校验方法,主要进行消费属性校验。
this.checkConfig();
将订阅关系配置信息进行复制
this.copySubscription();
如果当前为集群消费模式,修改实例名为pid
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
创建一个新的MQClientInstance实例,如果已经存在直接使用该存在的MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
为消费者负载均衡实现rebalanceImpl设置属性
// 设置消费者组
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 设置消费模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 设置队列分配策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 设置当前的MQClientInstance实例
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 注册消息过滤钩子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
处理offset存储方式
// offsetStore不为空则使用当前的offsetStore方式
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 否则根据消费方式选择具体的offsetStore方式存储offset
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 如果是广播方式,则使用本地存储方式
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
// 如果是集群方式,则使用远端broker存储方式存储offset
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加载当前的offset
this.offsetStore.load();
根据MessageListener的具体实现方式选取具体的消息拉取线程实现。
// 如果是MessageListenerOrderly顺序消费接口实现
// 消息消费服务选择:ConsumeMessageOrderlyService(顺序消息消费服务)
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}
// 如果是MessageListenerConcurrently并行消息消费接口实现
// 消息消费服务选择:ConsumeMessageConcurrentlyService(并行消息消费服务)
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
选择并初始化完成具体的消息消费服务之后,启动消息消费服务。consumeMessageService主要负责对消息进行消费,它的内部维护了一个线程池。
// 启动消息消费服务
this.consumeMessageService.start();
接着向MQClientInstance注册消费者,并启动MQClientInstance。这里再次强调
一个JVM中所有消费者、生产者持有同一个MQClientInstance,且MQClientInstance只会启动一次
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
如果MQClientInstance已经启动,或者已经关闭,或者启动失败,重复调用start会报错。这里也能直观的反映出:MQClientInstance的启动只有一次
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
启动完成执行后续收尾工作
// 订阅关系改变,更新Nameserver的订阅关系表
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 检查客户端状态
this.mQClientFactory.checkClientInBroker();
// 发送心跳包
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒执行消费者负载均衡
this.mQClientFactory.rebalanceImmediately();
}
copySubscription(),消息重试topic处理逻辑
消费者启动流程较为重要,我们接着对其中的重点方法展开讲解。这部分内容可以暂时跳过,不影响对主流程的把控。
我们研究一下copySubscription方法的实现细节。
[DefaultMQPushConsumerImpl.java]
private void copySubscription() throws MQClientException {
try {
// 首先获取订阅信息
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
// 为defaultMQPushConsumer设置具体的MessageListener实现
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
根据消费类型选择是否进行重试topic订阅
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 如果是广播消费模式,则不进行任何处理,即无重试
case BROADCASTING:
break;
// 如果是集群消费模式,订阅重试主题消息
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
如果是集群消费模式,会订阅重试主题消息
获取重试topic,规则为 RETRY_GROUP_TOPIC_PREFIX + consumerGroup,即:“%RETRY%”+消费组名 ;
为重试topic设置订阅关系,订阅所有的消息;
消费者启动的时候会自动订阅该重试主题,并参与该topic的消息队列负载过程。
小结
到此,我们就DefaultMQPushConsumer的初始化、启动、校验以及topic订阅、重试等代码实现
细节进行了较为详细的讲解。
下一章节,我将带领读者对消息消费线程 consumeMessageService 的实现进行分析,我们下篇文章见。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。