跟我学RocektMQ之理解长轮询机制
RoceketMQ提供了两种消息消费者,DefaultMQPushConsumer、DefaultMQPullConsumer。我们都知道DefaultMQPullConsumer是基于拉模式的消费,而DefaultMQPushConsumer是基于推模式的消费。我们先简单复习一下推拉模式的概念。
推模式:当服务端有数据立即通知客户端,这个策略依赖服务端与客户端之间的长连接,它具有高实时性、客户端开发简单等优点;同时缺点也很明显,比如:服务端需要感知与它建立链接的客户端,要实现客户端节点的发现,服务端本身主动推送,需要服务端对消息做额外的处理,以便能够及时将消息分发给客户端。
拉模式:客户端主动对服务端的数据进行拉取。客户端拉取数据,拉取成功后处理数据,处理完成再次进行拉取,循环执行。缺点是如果不能很好的设置拉取的频率,时间间隔,过多的空轮询会对服务端造成较大的访问压力,数据的实时性也不能得到很好的保证。
基于对上述两个策略的优缺点的综合考虑,RocketMQ的DefaultMQPushConsumer采用了结合了推拉模式两者优点的长轮询机制,对消息进行消费。这样,既能保证主动权在客户端,还能保证数据拉取的实时性。
本文我们就对RocketMQ的长轮询机制进行分析讲解,从而更好的理解RocketMQ的设计精巧之处。
首先了解一下什么是 长轮询 机制:
什么是“长轮询”机制
长轮询机制,顾名思义,它不同于常规轮询方式。常规的轮询方式为客户端发起请求,服务端接收后该请求后立即进行相应的方式。
长轮询本质上仍旧是轮询,它与轮询不同之处在于,当服务端接收到客户端的请求后,服务端不会立即将数据返回给客户端,而是会先将这个请求hold住,判断服务器端数据是否有更新。如果有更新,则对客户端进行响应,如果一直没有数据,则它会在长轮询超时时间之前一直hold住请求并检测是否有数据更新,直到有数据或者超时后才返回。
RocketMQ如何实现长轮询–客户端实现
了解了长轮询机制的概念,我们就容易理解RocketMQ对长轮询机制的应用了。请跟随笔者的思路,进入到源码中一探究竟。
首先复习一下客户端如何进行消息拉取:
从上文中,我们已经得知,DefaultMQPushConsumer应用了长轮询机制,从之前的源码分析文章中,我们知道RocketMQ消息拉取是通过消息拉取线程PullMessageService实现的,关于这部分的逻辑可以移步 跟我学RocketMQ之消息拉取源码解析。
我们进入PullMessageService类,重点看它的 run() 方法。
[PullMessageService.java]
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
当broker启动后,会在启动MQClientInstance过程中启动PullMessageService,当PullMessageService启动后一直执行run方法进行消息拉取(只要stopped == false)。
回顾一下PullRequest的结构:
public class PullRequest {
// 消费者组
private String consumerGroup;
// 待拉取到消息队列
private MessageQueue messageQueue;
// 消息处理队列,消息从broker中拉取以后会先存到该ProcessQueue中,然后再提交给消费者线程池进行消费
private ProcessQueue processQueue;
// 带拉取消息的偏移量
private long nextOffset;
// 是否锁定
private boolean lockedFirst = false;
对于每个MessageQueue,都有对应的一个pullRequest,每个MessageQueue还对应一个processQueue,保存该MessageQueue消息处理的快照;通过nextOffset来标识当前读取的位置。
消息拉取最终是由PullAPIWrapper.java执行的,在它的pullKernelImpl()方法中,真正的消息拉取逻辑如下:
[PullAPIWrapper.java.pullKernelImpl()]
// 组装消息拉取请求头
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
// 设置broker最大阻塞时间,默认为15秒,BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
// 获取拉取broker地址
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
// 执行消息拉取
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
这里的参数brokerSuspendMaxTimeMillis(默认值为15s)代表进行消息拉取时,broker的最长阻塞时间。
当进行消息拉取时,如果broker端没有消息,则进行阻塞,否则会对消息体进行打包并直接返回。
RocketMQ如何实现长轮询–服务端实现
RocketMQ的长轮询是在broker上实现的,具体的代码实现在PullMessageProcessor中。我们进入代码中一窥芳容。
它的启动链路如下:
BrokerStartup
|-start()
|-createBrokerController(String[] args)
|-BrokerController() // BrokerController构造方法
|-new PullMessageProcessor(this);
当broker启动完成之后,PullMessageProcessor便能够被远程的消费者访问到,通过网络进行消息拉取调用操作。
我们重点看方法processRequest,它是消息拉取网络交互的核心方法。
processRequest()
processRequest为broker对外提供消息拉取的服务方法,它提供针对不同拉取结果的处理逻辑。
[PullMessageProcessor.java.processRequest]
// 根据客户端发送的拉取消息头,构建拉取结果响应体
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
...各种前置校验...
// 从请求头中取出消费者组、主题、队列id、offset、消息最大拉取条数、过滤条件等,去commitLog中查找对应的消息
switch (getMessageResult.getStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
...省略其他case分支...
// 根据上面拉取结果中设置的code进行处理
switch (response.getCode()) {
...省略其他case分支...
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
对于ResponseCode.SUCCESS的拉取响应码,RocektMQ将消息拉取结果以byte数组形式设置到拉取响应中,并会返回给客户端;我们重点关注 ResponseCode.PULL_NOT_FOUND 类型,即 当前未拉取到消息。
对于ResponseCode.PULL_NOT_FOUND类型,RocketMQ会调用PullRequestHoldService将请求holkd住,不会返回客户端响应,这里就是长轮询的核心逻辑,代码如下:
case ResponseCode.PULL_NOT_FOUND:
// 判断broker是否允许被挂起
if (brokerAllowSuspend && hasSuspendFlag) {
// 获取长轮询超时时长
long pollingTimeMills = suspendTimeoutMillisLong;
// 如果长轮询支持未开启,则pollingTimeMills为短轮询时间,ShortPollingTimeMills默认为1秒
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
// 根据入参request,Nio的channel,轮询时间,当前消息存储时间戳,消息拉取offset,订阅信息,消息过滤表达式等信息构建长轮询拉取请求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 通过PullRequestHoldService对拉取请求进行hold,使用pullRequest对指定topic、queueId的队列进行长轮询消息拉取
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
// 设置拉取返回为null,不对客户端进行返回
response = null;
break;
}
我们总结一下这里的逻辑:
- 首先判断broker是否允许被hold,如果允许则执行长轮询业务逻辑
- 获取长轮询超时时长,该参数可配置,如果长轮询支持未开启则改用短轮询时间,默认为1s
- 从消息拉取请求头中获取topic、队列offset、队列id
- 构造长轮询消息拉取请求对象PullRequest
- 调用PullRequestHoldService进行长轮询操作
- 拉取返回为空,在超时之前不对客户端进行返回
PullRequestHoldService核心逻辑
从上面的分析我们得知,长轮询真正的执行者为PullRequestHoldService,我们看下这个类的代码,PullRequestHoldService继承了ServiceThread,我们重点关注其run方法。
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
// 如果支持长轮询,则等待5秒
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
// 短轮询则默认等待1s
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 检测hold请求
this.checkHoldRequest();
// 如果检测花费时间超过5s打印日志
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
run方法不断检测被hold住的请求,它不断检查是否有消息获取成功。检测方法通过执行方法suspendPullRequest实现
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
// 从pullRequestTable中获取对应topic+queueId下的拉取请求ManyPullRequest
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
// 将等待检测的pullRequest添加到ManyPullRequest中
mpr.addPullRequest(pullRequest);
}
注意,这里的ManyPullRequest对象实际上是一组PullRequest的集合,它封装了一个topic+queueId下的一批消息。
具体的检测逻辑通过方法checkHoldRequest()实现。
private void checkHoldRequest() {
// 迭代PullRequest Map,key=topic@queueId
for (String key : this.pullRequestTable.keySet()) {
// 解析出topic queueId
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
// 获取当前获取的数据的最大offset
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
// 通知消息到达
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
checkHoldRequest()方法解析pullRequestTable的keySet,对key进行解析,取出topic及queueId,获取topic+queueId对应的当前MessageQueue的最大offset,并与当前的offset对比从而确定是否有新消息到达,具体逻辑在notifyMessageArriving(topic, queueId, offset);方法中实现
这里的检测逻辑整体是异步的,后台检测线程PullRequestHoldService一直在运行;在PullMessageProcessor中提交待检测的PullRequest到PullRequestHoldService,将其放入pullRequestTable,等待被PullRequestHoldService进行处理。
notifyMessageArriving(topic, queueId, offset)
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
// 根据key=topic@queueId从pullRequestTable获取ManyPullRequest
// 如果ManyPullRequest不为空,拷贝ManyPullRequest中的List<PullRequest>
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
// 构造响应list
List<PullRequest> replayList = new ArrayList<PullRequest>();
// 迭代请求list
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
// 如果当前最新的offset小于等于请求的offset
if (newestOffset <= request.getPullFromThisOffset()) {
// 当前最新的offset就是队列的最大offset
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
// 如果当前最新offset大于请求offset,也就是有新消息到来
if (newestOffset > request.getPullFromThisOffset()) {
// 判断消息是否满足过滤表达式
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
try {
// 消息匹配,则将消息返回客户端
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
// 判断是否超时
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
// 如果当前时间 >= 请求超时时间+hold时间,则返回客户端消息未找到
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
总结一下,notifyMessageArriving主要作用为判断消息是否到来,并根据判断结果对客户端进行相应。
- 比较maxOffset与当前的offset,如果当前最新offset大于请求offset,也就是有新消息到来,则将新消息返回给客户端
- 校验是否超时,如果当前时间 >= 请求超时时间+hold阻塞时间,则返回客户端消息未找到
该方法会在PullRequestHoldService中循环调用进行检查,也会在DefaultMessageStore中消息被存储的时候调用。这里体现了主动检查与被动通知共同作用的思路。
当服务端处理完成之后,相应客户端,客户端会在消息处理完成之后再次将拉取请求pullRequest放到PullMessageService中,等待下次轮询。这样就能够一直进行消息拉取操作。
小结
本文对RocketMQ消息拉取的长轮询机制进行了分支,我们得知:
RocektMQ并没有使用推模式或者拉模式,而是使用了结合两者优点的长轮询机制,它本质上还是拉模式,但服务端能够通过hold住请求的方式减少客户端对服务端的频繁访问,从而提高资源利用率及消息响应实时性。这种策略在服务端开发的其他方向如:IM等领域都有广泛的实践,因此了解它的原理是有必要的。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。