跟我学RocketMQ之订阅关系一致性源码讨论
RocketMQ消费者在进行消费时,需要遵循 “订阅关系一致” 原则,关于订阅关系一致,我引用阿里云RocketMQ页面的解释,如下图:
从图中可以提炼出关键词,即:
同一个消费者组订阅的topic、topic中的tag必须保持一致,否则会出现消费不到消息的情况。
举个例子:比如我们有个topic名为DEMO_TOPIC,它有两个tag,分别为tagA、tagB。用一个消费者组demo_group分别订阅tagA、tagB,这时就会出现某个tag对应的消费者消费不到消息的情况。
解决方法就是:针对不同的tag使用不同的消费者组,在上面的案例中的解决方法为:使用demo_group_A 订阅tagA,使用demo_group_B订阅tagB。
提供了解决方案,还是有些意犹未尽,那么我们就深入RocketMQ的源码,感受一下订阅关系一致的机理。
心跳维持MQClientInstance.java
在之前的源码解析中,我们已经讲到了消费者客户端实例MQClientInstance中启动了心跳维持线程,具体的代码如下:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
我们进入方法startScheduledTask();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
这段代码中,主要向定时任务调度线程池中提交了清理离线Broker、发送心跳包到所有broker这两个任务,我们重点看心跳包发送方法sendHeartbeatToAllBrokerWithLock();
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}
继续进入sendHeartbeatToAllBroker()方法查看逻辑。
private void sendHeartbeatToAllBroker() {
// 构造前置心跳包
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
// 没有消费者或生产者
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
// 真正发送心跳的逻辑
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
...省略异常...
}
}
}
}
}
}
}
这个方法中向所有Broker发送心跳,心跳消息类型为是HEART_BEAT类型的消息,这类消息在broker中使用ClientManageProcessor处理,那么我们就进入ClientManageProcessor看下心跳处理逻辑:heartBeat()方法
broker心跳处理逻辑
[ClientManageProcessor.java]
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 解码客户端的心跳请求体
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// 消息订阅组配置
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
// 注册消费者实例
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
我们主要关注registerConsumer()方法,此处broker会根据consumer发送的消息,获取自身记录的消费者订阅信息,这个逻辑是按照消费组为单位获取的,我们进入registerConsumer方法体
[ConsumerManager.java]
public boolean registerConsumer(final String group,
final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType,
MessageModel messageModel,
ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
// 不存在则根据心跳构造新的消费组信息
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新ClientChannelInfo
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅关系表
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
我们仔细研究一下这段代码,首先
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
首次不存在订阅关系直接讲订阅关系放置到订阅关系表。
接着进入consumerGroupInfo.updateSubscription(subList);方法
[ConsumerGroupInfo.java]
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
// 遍历订阅关系列表
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
// 如果原先的订阅关系不存在
if (old == null) {
// 更新本订阅关系
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
// 如果当前的version大于原有version,则更新订阅关系
// version值为系统时间戳
// (SubscriptionData.java)
// private long subVersion = System.currentTimeMillis();
} else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
这里主要做订阅关系表更新逻辑,如果不存在订阅关系则直接更新;如果存在则比较哪一个更新,最新的会覆盖老的那一个。
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
for (SubscriptionData sub : subList) {
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
next.getValue().toString()
);
it.remove();
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
继续往下看,对订阅关系表进行迭代处理。
如果当前的订阅的topic与上次的topic不相等,则exist(topic存在标识)设置为true,进入if代码块,执行remove操作,将老的topic删掉,后续的topic就覆盖了老的topic。
consumerTable中存放按照消费者进行划分依据的消费者信息。如果一个组的消费信息不一样,在上文举的例子中,则订阅了topicA的消费者心跳信息首先通知broker自己组订阅了topicA/tagA,broker记录了该订阅关系并更新了本地的订阅关系表。当另外的心跳发送过来,通知broker当前组订阅的是topicB/tagB,后来的这一个的时间戳必然大于前一个,就会将前一个覆盖,导致订阅关系发生变化。
这样会导致了订阅消息相互覆盖,当拉取消息时,会存在一个消费者没法拉到消息,因为Broker上查询不到该订阅信息。
其他原因
除了上述原因外,还有一个更为重要的原因在于消息的Rebalance过程。我们看一下RebalanceImpl.java
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
...省略...
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
这里对某个topic下消息的进行Rebalance,我们进入 this.mQClientFactory.findConsumerIdList(topic, consumerGroup);这一行
[MQClientInstance.java]
public List<String> findConsumerIdList(final String topic, final String group) {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
try {
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
}
}
return null;
}
这里根据topic获取到broker地址,如果broker地址存在则获取消费者id列表。
这里是根据consumerGroup组来进行选择的,如果同一个group订阅了两个以上topic或者多个tag,则会把另外一个topic的消费者也取下来,导致Rebalance之后出现问题,这会导致每个topic下面的数据量少一半(如果是2个不同topic)
关于消息的Rebalance过程我们在后续的文章中会单独进行分析。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。