文章目录
  1. 1. 心跳维持MQClientInstance.java
  2. 2. broker心跳处理逻辑
  3. 3. 其他原因

RocketMQ消费者在进行消费时,需要遵循 “订阅关系一致” 原则,关于订阅关系一致,我引用阿里云RocketMQ页面的解释,如下图:

rmq0.png

从图中可以提炼出关键词,即:

同一个消费者组订阅的topic、topic中的tag必须保持一致,否则会出现消费不到消息的情况。

举个例子:比如我们有个topic名为DEMO_TOPIC,它有两个tag,分别为tagA、tagB。用一个消费者组demo_group分别订阅tagA、tagB,这时就会出现某个tag对应的消费者消费不到消息的情况。

解决方法就是:针对不同的tag使用不同的消费者组,在上面的案例中的解决方法为:使用demo_group_A 订阅tagA,使用demo_group_B订阅tagB。

提供了解决方案,还是有些意犹未尽,那么我们就深入RocketMQ的源码,感受一下订阅关系一致的机理。

心跳维持MQClientInstance.java

在之前的源码解析中,我们已经讲到了消费者客户端实例MQClientInstance中启动了心跳维持线程,具体的代码如下:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();

我们进入方法startScheduledTask();

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
        }
    }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

这段代码中,主要向定时任务调度线程池中提交了清理离线Broker、发送心跳包到所有broker这两个任务,我们重点看心跳包发送方法sendHeartbeatToAllBrokerWithLock();

public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}

继续进入sendHeartbeatToAllBroker()方法查看逻辑。

private void sendHeartbeatToAllBroker() {

    // 构造前置心跳包
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();

    // 没有消费者或生产者
    if (producerEmpty && consumerEmpty) {
        log.warn("sending heartbeat, but no consumer and no producer");
        return;
    }

    if (!this.brokerAddrTable.isEmpty()) {
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, HashMap<Long, String>> entry = it.next();
            String brokerName = entry.getKey();
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    Long id = entry1.getKey();
                    String addr = entry1.getValue();
                    if (addr != null) {
                        if (consumerEmpty) {
                            if (id != MixAll.MASTER_ID)
                                continue;
                        }

                        try {
                            // 真正发送心跳的逻辑
                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                            if (!this.brokerVersionTable.containsKey(brokerName)) {
                                this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                            }
                            this.brokerVersionTable.get(brokerName).put(addr, version);
                            if (times % 20 == 0) {
                                log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                log.info(heartbeatData.toString());
                            }
                        } catch (Exception e) {
                            ...省略异常...
                        }
                    }
                }
            }
        }
    }
}

这个方法中向所有Broker发送心跳,心跳消息类型为是HEART_BEAT类型的消息,这类消息在broker中使用ClientManageProcessor处理,那么我们就进入ClientManageProcessor看下心跳处理逻辑:heartBeat()方法

broker心跳处理逻辑

[ClientManageProcessor.java]
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
    RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // 解码客户端的心跳请求体
    HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
    ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
        ctx.channel(),
        heartbeatData.getClientID(),
        request.getLanguage(),
        request.getVersion()
    );

    for (ConsumerData data : heartbeatData.getConsumerDataSet()) {

        // 消息订阅组配置
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                data.getGroupName());
        boolean isNotifyConsumerIdsChangedEnable = true;
        if (null != subscriptionGroupConfig) {
            isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
            int topicSysFlag = 0;
            if (data.isUnitMode()) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            }
            String newTopic = MixAll.getRetryTopic(data.getGroupName());
            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        }

        // 注册消费者实例
        boolean changed = this.brokerController.getConsumerManager().registerConsumer(
            data.getGroupName(),
            clientChannelInfo,
            data.getConsumeType(),
            data.getMessageModel(),
            data.getConsumeFromWhere(),
            data.getSubscriptionDataSet(),
            isNotifyConsumerIdsChangedEnable
        );

        if (changed) {
            log.info("registerConsumer info changed {} {}",
                data.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel())
            );
        }
    }

    for (ProducerData data : heartbeatData.getProducerDataSet()) {
        this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
            clientChannelInfo);
    }
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

我们主要关注registerConsumer()方法,此处broker会根据consumer发送的消息,获取自身记录的消费者订阅信息,这个逻辑是按照消费组为单位获取的,我们进入registerConsumer方法体

[ConsumerManager.java]
public boolean registerConsumer(final String group, 
final ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, 
    MessageModel messageModel, 
    ConsumeFromWhere consumeFromWhere,
    final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

    // 获取消费者组信息
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);

    // 不存在则根据心跳构造新的消费组信息
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    // 更新ClientChannelInfo
    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
            consumeFromWhere);

    // 更新订阅关系表
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }

    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

    return r1 || r2;
}

我们仔细研究一下这段代码,首先

ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);

首次不存在订阅关系直接讲订阅关系放置到订阅关系表。

接着进入consumerGroupInfo.updateSubscription(subList);方法

[ConsumerGroupInfo.java]
public boolean updateSubscription(final Set<SubscriptionData> subList) {
    boolean updated = false;

    // 遍历订阅关系列表
    for (SubscriptionData sub : subList) {
        SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
        // 如果原先的订阅关系不存在
        if (old == null) {
            // 更新本订阅关系
            SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            if (null == prev) {
                updated = true;
                log.info("subscription changed, add new topic, group: {} {}",
                    this.groupName,
                    sub.toString());
            }
        // 如果当前的version大于原有version,则更新订阅关系
        // version值为系统时间戳
        // (SubscriptionData.java)
        // private long subVersion = System.currentTimeMillis();
        } else if (sub.getSubVersion() > old.getSubVersion()) {
            if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                log.info("subscription changed, group: {} OLD: {} NEW: {}",
                    this.groupName,
                    old.toString(),
                    sub.toString()
                );
            }

            this.subscriptionTable.put(sub.getTopic(), sub);
        }
    }

这里主要做订阅关系表更新逻辑,如果不存在订阅关系则直接更新;如果存在则比较哪一个更新,最新的会覆盖老的那一个。

Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
    Entry<String, SubscriptionData> next = it.next();
    String oldTopic = next.getKey();

    boolean exist = false;
    for (SubscriptionData sub : subList) {
        if (sub.getTopic().equals(oldTopic)) {
            exist = true;
            break;
        }
    }

    if (!exist) {
        log.warn("subscription changed, group: {} remove topic {} {}",
            this.groupName,
            oldTopic,
            next.getValue().toString()
        );

        it.remove();
        updated = true;
    }
}

this.lastUpdateTimestamp = System.currentTimeMillis();

return updated;

继续往下看,对订阅关系表进行迭代处理。

如果当前的订阅的topic与上次的topic不相等,则exist(topic存在标识)设置为true,进入if代码块,执行remove操作,将老的topic删掉,后续的topic就覆盖了老的topic。

consumerTable中存放按照消费者进行划分依据的消费者信息。如果一个组的消费信息不一样,在上文举的例子中,则订阅了topicA的消费者心跳信息首先通知broker自己组订阅了topicA/tagA,broker记录了该订阅关系并更新了本地的订阅关系表。当另外的心跳发送过来,通知broker当前组订阅的是topicB/tagB,后来的这一个的时间戳必然大于前一个,就会将前一个覆盖,导致订阅关系发生变化。

这样会导致了订阅消息相互覆盖,当拉取消息时,会存在一个消费者没法拉到消息,因为Broker上查询不到该订阅信息。

其他原因

除了上述原因外,还有一个更为重要的原因在于消息的Rebalance过程。我们看一下RebalanceImpl.java

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            ...省略...
        case CLUSTERING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
            }

这里对某个topic下消息的进行Rebalance,我们进入 this.mQClientFactory.findConsumerIdList(topic, consumerGroup);这一行

[MQClientInstance.java]
public List<String> findConsumerIdList(final String topic, final String group) {
        String brokerAddr = this.findBrokerAddrByTopic(topic);
        if (null == brokerAddr) {
            this.updateTopicRouteInfoFromNameServer(topic);
            brokerAddr = this.findBrokerAddrByTopic(topic);
        }

        if (null != brokerAddr) {
            try {
                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
            } catch (Exception e) {
                log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
            }
        }

        return null;
    }

这里根据topic获取到broker地址,如果broker地址存在则获取消费者id列表。

这里是根据consumerGroup组来进行选择的,如果同一个group订阅了两个以上topic或者多个tag,则会把另外一个topic的消费者也取下来,导致Rebalance之后出现问题,这会导致每个topic下面的数据量少一半(如果是2个不同topic)

关于消息的Rebalance过程我们在后续的文章中会单独进行分析。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 心跳维持MQClientInstance.java
  2. 2. broker心跳处理逻辑
  3. 3. 其他原因
Fork me on GitHub