文章目录
  1. 1. 什么是“长轮询”机制
  2. 2. RocketMQ如何实现长轮询–客户端实现
  3. 3. RocketMQ如何实现长轮询–服务端实现
    1. 3.1. processRequest()
  4. 4. PullRequestHoldService核心逻辑
    1. 4.1. notifyMessageArriving(topic, queueId, offset)
  5. 5. 小结

RoceketMQ提供了两种消息消费者,DefaultMQPushConsumer、DefaultMQPullConsumer。我们都知道DefaultMQPullConsumer是基于拉模式的消费,而DefaultMQPushConsumer是基于推模式的消费。我们先简单复习一下推拉模式的概念。

推模式:当服务端有数据立即通知客户端,这个策略依赖服务端与客户端之间的长连接,它具有高实时性、客户端开发简单等优点;同时缺点也很明显,比如:服务端需要感知与它建立链接的客户端,要实现客户端节点的发现,服务端本身主动推送,需要服务端对消息做额外的处理,以便能够及时将消息分发给客户端。

拉模式:客户端主动对服务端的数据进行拉取。客户端拉取数据,拉取成功后处理数据,处理完成再次进行拉取,循环执行。缺点是如果不能很好的设置拉取的频率,时间间隔,过多的空轮询会对服务端造成较大的访问压力,数据的实时性也不能得到很好的保证。

基于对上述两个策略的优缺点的综合考虑,RocketMQ的DefaultMQPushConsumer采用了结合了推拉模式两者优点的长轮询机制,对消息进行消费。这样,既能保证主动权在客户端,还能保证数据拉取的实时性。

本文我们就对RocketMQ的长轮询机制进行分析讲解,从而更好的理解RocketMQ的设计精巧之处。

首先了解一下什么是 长轮询 机制:

什么是“长轮询”机制

长轮询机制,顾名思义,它不同于常规轮询方式。常规的轮询方式为客户端发起请求,服务端接收后该请求后立即进行相应的方式。

长轮询本质上仍旧是轮询,它与轮询不同之处在于,当服务端接收到客户端的请求后,服务端不会立即将数据返回给客户端,而是会先将这个请求hold住,判断服务器端数据是否有更新。如果有更新,则对客户端进行响应,如果一直没有数据,则它会在长轮询超时时间之前一直hold住请求并检测是否有数据更新,直到有数据或者超时后才返回。

RocketMQ如何实现长轮询–客户端实现

了解了长轮询机制的概念,我们就容易理解RocketMQ对长轮询机制的应用了。请跟随笔者的思路,进入到源码中一探究竟。

首先复习一下客户端如何进行消息拉取:

从上文中,我们已经得知,DefaultMQPushConsumer应用了长轮询机制,从之前的源码分析文章中,我们知道RocketMQ消息拉取是通过消息拉取线程PullMessageService实现的,关于这部分的逻辑可以移步 跟我学RocketMQ之消息拉取源码解析

我们进入PullMessageService类,重点看它的 run() 方法。

[PullMessageService.java]
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

当broker启动后,会在启动MQClientInstance过程中启动PullMessageService,当PullMessageService启动后一直执行run方法进行消息拉取(只要stopped == false)。

回顾一下PullRequest的结构:

public class PullRequest {
    // 消费者组
    private String consumerGroup;
    // 待拉取到消息队列
    private MessageQueue messageQueue;
    // 消息处理队列,消息从broker中拉取以后会先存到该ProcessQueue中,然后再提交给消费者线程池进行消费
    private ProcessQueue processQueue;
    // 带拉取消息的偏移量
    private long nextOffset;
    // 是否锁定
    private boolean lockedFirst = false;

对于每个MessageQueue,都有对应的一个pullRequest,每个MessageQueue还对应一个processQueue,保存该MessageQueue消息处理的快照;通过nextOffset来标识当前读取的位置。

消息拉取最终是由PullAPIWrapper.java执行的,在它的pullKernelImpl()方法中,真正的消息拉取逻辑如下:

[PullAPIWrapper.java.pullKernelImpl()]

// 组装消息拉取请求头
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
// 设置broker最大阻塞时间,默认为15秒,BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);

// 获取拉取broker地址
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
    brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}

// 执行消息拉取
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    brokerAddr,
    requestHeader,
    timeoutMillis,
    communicationMode,
    pullCallback);
return pullResult;

这里的参数brokerSuspendMaxTimeMillis(默认值为15s)代表进行消息拉取时,broker的最长阻塞时间。

当进行消息拉取时,如果broker端没有消息,则进行阻塞,否则会对消息体进行打包并直接返回。

RocketMQ如何实现长轮询–服务端实现

RocketMQ的长轮询是在broker上实现的,具体的代码实现在PullMessageProcessor中。我们进入代码中一窥芳容。

它的启动链路如下:

BrokerStartup
    |-start()
        |-createBrokerController(String[] args) 
            |-BrokerController() // BrokerController构造方法
            |-new PullMessageProcessor(this);

当broker启动完成之后,PullMessageProcessor便能够被远程的消费者访问到,通过网络进行消息拉取调用操作。

我们重点看方法processRequest,它是消息拉取网络交互的核心方法。

processRequest()

processRequest为broker对外提供消息拉取的服务方法,它提供针对不同拉取结果的处理逻辑。

[PullMessageProcessor.java.processRequest]
// 根据客户端发送的拉取消息头,构建拉取结果响应体
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
...各种前置校验...
// 从请求头中取出消费者组、主题、队列id、offset、消息最大拉取条数、过滤条件等,去commitLog中查找对应的消息
switch (getMessageResult.getStatus()) {
        case FOUND:
            response.setCode(ResponseCode.SUCCESS);
            break;
        case MESSAGE_WAS_REMOVING:
            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
            break;
...省略其他case分支...

// 根据上面拉取结果中设置的code进行处理           
switch (response.getCode()) {
    ...省略其他case分支...
    case ResponseCode.PULL_NOT_FOUND:

            if (brokerAllowSuspend && hasSuspendFlag) {
                long pollingTimeMills = suspendTimeoutMillisLong;
                if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                }

                String topic = requestHeader.getTopic();
                long offset = requestHeader.getQueueOffset();
                int queueId = requestHeader.getQueueId();
                PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                    this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                response = null;
                break;
            }

对于ResponseCode.SUCCESS的拉取响应码,RocektMQ将消息拉取结果以byte数组形式设置到拉取响应中,并会返回给客户端;我们重点关注 ResponseCode.PULL_NOT_FOUND 类型,即 当前未拉取到消息

对于ResponseCode.PULL_NOT_FOUND类型,RocketMQ会调用PullRequestHoldService将请求holkd住,不会返回客户端响应,这里就是长轮询的核心逻辑,代码如下:

case ResponseCode.PULL_NOT_FOUND:
    // 判断broker是否允许被挂起
    if (brokerAllowSuspend && hasSuspendFlag) {
        // 获取长轮询超时时长
        long pollingTimeMills = suspendTimeoutMillisLong;
        // 如果长轮询支持未开启,则pollingTimeMills为短轮询时间,ShortPollingTimeMills默认为1秒
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        // 根据入参request,Nio的channel,轮询时间,当前消息存储时间戳,消息拉取offset,订阅信息,消息过滤表达式等信息构建长轮询拉取请求
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        // 通过PullRequestHoldService对拉取请求进行hold,使用pullRequest对指定topic、queueId的队列进行长轮询消息拉取
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        // 设置拉取返回为null,不对客户端进行返回
        response = null;
        break;
    }

我们总结一下这里的逻辑:

  • 首先判断broker是否允许被hold,如果允许则执行长轮询业务逻辑
  • 获取长轮询超时时长,该参数可配置,如果长轮询支持未开启则改用短轮询时间,默认为1s
  • 从消息拉取请求头中获取topic、队列offset、队列id
  • 构造长轮询消息拉取请求对象PullRequest
  • 调用PullRequestHoldService进行长轮询操作
  • 拉取返回为空,在超时之前不对客户端进行返回

PullRequestHoldService核心逻辑

从上面的分析我们得知,长轮询真正的执行者为PullRequestHoldService,我们看下这个类的代码,PullRequestHoldService继承了ServiceThread,我们重点关注其run方法。

@Override
public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            // 如果支持长轮询,则等待5秒
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                // 短轮询则默认等待1s
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            // 检测hold请求
            this.checkHoldRequest();
            // 如果检测花费时间超过5s打印日志
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    log.info("{} service end", this.getServiceName());
}

run方法不断检测被hold住的请求,它不断检查是否有消息获取成功。检测方法通过执行方法suspendPullRequest实现

private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
    new ConcurrentHashMap<String, ManyPullRequest>(1024);

public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    // 从pullRequestTable中获取对应topic+queueId下的拉取请求ManyPullRequest
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    // 将等待检测的pullRequest添加到ManyPullRequest中
    mpr.addPullRequest(pullRequest);
}

注意,这里的ManyPullRequest对象实际上是一组PullRequest的集合,它封装了一个topic+queueId下的一批消息。

具体的检测逻辑通过方法checkHoldRequest()实现。

private void checkHoldRequest() {
    // 迭代PullRequest Map,key=topic@queueId
    for (String key : this.pullRequestTable.keySet()) {
        // 解析出topic  queueId
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            // 获取当前获取的数据的最大offset
            final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
            try {
                // 通知消息到达
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
            }
        }
    }
}

checkHoldRequest()方法解析pullRequestTable的keySet,对key进行解析,取出topic及queueId,获取topic+queueId对应的当前MessageQueue的最大offset,并与当前的offset对比从而确定是否有新消息到达,具体逻辑在notifyMessageArriving(topic, queueId, offset);方法中实现

这里的检测逻辑整体是异步的,后台检测线程PullRequestHoldService一直在运行;在PullMessageProcessor中提交待检测的PullRequest到PullRequestHoldService,将其放入pullRequestTable,等待被PullRequestHoldService进行处理。

notifyMessageArriving(topic, queueId, offset)

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (mpr != null) {
        // 根据key=topic@queueId从pullRequestTable获取ManyPullRequest
        // 如果ManyPullRequest不为空,拷贝ManyPullRequest中的List<PullRequest>
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if (requestList != null) {
            // 构造响应list
            List<PullRequest> replayList = new ArrayList<PullRequest>();
            // 迭代请求list
            for (PullRequest request : requestList) {
                long newestOffset = maxOffset;
                // 如果当前最新的offset小于等于请求的offset
                if (newestOffset <= request.getPullFromThisOffset()) {
                    // 当前最新的offset就是队列的最大offset
                    newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                }
                // 如果当前最新offset大于请求offset,也就是有新消息到来
                if (newestOffset > request.getPullFromThisOffset()) {
                    // 判断消息是否满足过滤表达式
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    // match by bit map, need eval again when properties is not null.
                    if (match && properties != null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }
                    if (match) {
                        try {
                            // 消息匹配,则将消息返回客户端
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                }
                // 判断是否超时
                if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        // 如果当前时间 >= 请求超时时间+hold时间,则返回客户端消息未找到
                        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }
                replayList.add(request);
            }
            if (!replayList.isEmpty()) {
                mpr.addPullRequest(replayList);
            }
        }
    }
}

总结一下,notifyMessageArriving主要作用为判断消息是否到来,并根据判断结果对客户端进行相应。

  • 比较maxOffset与当前的offset,如果当前最新offset大于请求offset,也就是有新消息到来,则将新消息返回给客户端
  • 校验是否超时,如果当前时间 >= 请求超时时间+hold阻塞时间,则返回客户端消息未找到

该方法会在PullRequestHoldService中循环调用进行检查,也会在DefaultMessageStore中消息被存储的时候调用。这里体现了主动检查与被动通知共同作用的思路。

当服务端处理完成之后,相应客户端,客户端会在消息处理完成之后再次将拉取请求pullRequest放到PullMessageService中,等待下次轮询。这样就能够一直进行消息拉取操作。

小结

本文对RocketMQ消息拉取的长轮询机制进行了分支,我们得知:

RocektMQ并没有使用推模式或者拉模式,而是使用了结合两者优点的长轮询机制,它本质上还是拉模式,但服务端能够通过hold住请求的方式减少客户端对服务端的频繁访问,从而提高资源利用率及消息响应实时性。这种策略在服务端开发的其他方向如:IM等领域都有广泛的实践,因此了解它的原理是有必要的。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 什么是“长轮询”机制
  2. 2. RocketMQ如何实现长轮询–客户端实现
  3. 3. RocketMQ如何实现长轮询–服务端实现
    1. 3.1. processRequest()
  4. 4. PullRequestHoldService核心逻辑
    1. 4.1. notifyMessageArriving(topic, queueId, offset)
  5. 5. 小结
Fork me on GitHub