跟我学RocketMQ之消息拉取源码解析
我们继续对消息消费流程的源码进行解析。
本文主要针对push模式下的消息拉取流程进行解析。我们重点分析集群消费模式,对于广播模式其实很好理解,每个消费者都需要拉取主题下面的所有消费队列的消息。
在集群消费模式下,同一个消费者组内包含了多个消费者实例,同一个topic下存在多个消费队列。对于单个消费者组,其内部维护了一个线程池进行消息消费,这部分内容可以移步 跟我学RocketMQ之消息消费源码解析(2)。
之前我们已经研究了消费者的初始化流程,在启动MQClientInstance过程中,启动了一个消息拉取线程PullMessageService进行消息拉取工作。
PullMessageService启动
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
...省略其他...
this.pullMessageService = new PullMessageService(this);
...省略其他...
this.rebalanceService = new RebalanceService(this);
}
可以看到在MQClientInstance的构造初始化过程中,启动了PullMessageService线程。
PullMessageService启动
从之前文章中对消息消费启动过程的分析中得知,在消费者启动过程defaultMQPushConsumerImpl.start()中,我们启动了MQClientInstance。
在启动MQClientInstance的过程中,对消息拉取线程进行了start()。消息拉取线程开始运行,看下代码实现
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// 启动消息拉取线程
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
通过 this.pullMessageService.start() 启动了消息拉取线程。
PullMessageService消息拉取流程分析
PullMessageService是ServiceThread的子类,ServiceThread是RocketMQ实现的具备启停能力的线程实现,它实现了Runnable接口。我们看一下PullMessageService的声明。
public class PullMessageService extends ServiceThread {
当PullMessageService启动后,开始运行run方法,我们看一下run方法逻辑。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// step 1
PullRequest pullRequest = this.pullRequestQueue.take();
// step 2
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
while (!this.isStopped()) { 这个写法是一种通用的设计技巧,stopped是一个声明为volatile的boolean类型变量,保证多线程下的可见性;每次执行逻辑时判断stopped是否为false,如果是则执行循环体内逻辑。
其他线程能够通过设置stopped为true,导致此处判断结果为false从而终止拉取线程的运行。
- [step 1] 从pullRequestQueue(LinkedBlockingQueue无界队列)中通过take()获取一个PullRequest消息拉取任务;如果队列为空,则线程阻塞,等待新的PullRequest被放入恢复运行。
- [step 2] 执行pullMessage方法进行真正的消息拉取操作。
PullRequest添加流程
在阅读pullMessage逻辑之前,我们先看一下PullRequest是从何添加的。
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
PullMessageService提供了即时添加与延时添加两种方式添加PullRequest,将其加入到pullRequestQueue阻塞队列中。PullRequest的创建过程是在RebalanceImpl中完成的,这个过程涉及到RocketMQ消息消费的重要过程 消息队列负载机制,这个过程我们会单独进行讲解。
PullRequest简介
我们简单看一下PullRequest的结构:
public class PullRequest {
// 消费者组
private String consumerGroup;
// 待拉取到消息队列
private MessageQueue messageQueue;
// 消息处理队列,消息从broker中拉取以后会先存到该ProcessQueue中,然后再提交给消费者线程池进行消费
private ProcessQueue processQueue;
// 带拉取消息的偏移量
private long nextOffset;
// 是否锁定
private boolean lockedFirst = false;
PullRequest消息拉取流程
我们继续回到PullRequest拉取流程中来,查看PullMessageService.pullMessage方法。
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
这是一个私有方法,可以看到
- 首先从MQClientInstance中选择一个消费者,选取条件为当前拉取请求中的消费者组;
- 将该消费者实例强转为DefaultMQPushConsumerImpl
- 调用DefaultMQPushConsumerImpl的pullMessage方法进行消息拉取。
我们接着进入DefaultMQPushConsumerImpl.java中查看其pullMessage的具体实现。
DefaultMQPushConsumerImpl.pullMessage消息拉取逻辑
前方大段代码预警……我会将这个大方法拆分成一段一段的细分逻辑进行分析。
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
首先从pullRequest请求中获取到处理队列processQueue,如果processQueue已经被丢弃则结束拉取流程;
如果processQueue未被丢弃,则更新LastPullTimestamp属性未当前时间戳。
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
判断当前线程状态是否为运行态,makeSureStateOK()方法会通过 this.serviceState != ServiceState.RUNNING 进行服务状态的判断; 如果不是RUNNING状态则抛出异常结束消息拉取流程。
if (this.isPause()) {
...省略warn日志...
// long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;单位毫秒
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
对消费者是否挂起进行判断,如果消费者状态为已挂起,则将拉取请求延迟1s后再次放到PullMessageService的消息拉取任务队列中。
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
...省略warn日志...
}
return;
}
此处进行消息拉取流控校验:
如果processQueue当前处理的消息条数超过了PullThresholdForQueue(消息拉取阈值=1000)触发流控,结束本次拉取任务,50毫秒之后将该拉取任务再次加入到消息拉取任务队列中。每触发1000次流控,打印warn日志;
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
...省略warn日志...
}
return;
}
如果不是顺序消费,判断processQueue中队列最大偏移量与最小偏移量之间的间隔,如果大于ConsumeConcurrentlyMaxSpan(拉取偏移量阈值==2000)触发流控,结束本次拉取;50毫秒之后将该拉取任务再次加入到消息拉取任务队列中。
每触发1000次流程,打印warn日志。
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
如果消息处理队列锁定成功,判断消息拉取请求是否锁定,如果没有锁定则计算从何处开始拉取。
判断broker是否繁忙,如果当前拉取的进度小于拉取请求中要拉取到下一个进度,表明当前broker处理的拉取请求还没有执行完成,因此brokerBusy为true,表示broker处于繁忙状态。
更新拉取请求的锁定标记为已锁定,更新下一次拉取的offset为计算出的offset。
如果消息处理队列未锁定,则延迟3s之后将将该拉取任务再次加入到消息拉取任务队列。打印日志表明稍后再进行消息拉取,原因为broker未被锁定。结束本次拉取。
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
获取当前topic的订阅消息,如果订阅消息不存在,则结束当前消息拉取;延迟三秒之后将拉取任务再次加入到消息拉取任务队列中。
final long beginTimestamp = System.currentTimeMillis();
...省略pullCallback实现...
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
获取消息订阅信息,如果订阅信息存在则获取tag标识。
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
这里主要构造了消息拉取的系统标识;
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
通过pullKernelImpl()方法发起真实的消息拉取请求,pullKernelImpl方法内部与服务端进行网络通信。底层调用了MQClientAPIImpl的pullMessage方法。此处涉及到网络通信,我们在后续的网络通讯代码部分进行分析。
注意此处的pullKernelImpl方法中的最后一个参数为PullCallback,PullCallback为从Broker拉取消息之后的回调方法,它的初始化代码如下,我们单独拿出来进行解析。
DefaultMQPushConsumerImpl.pullMessage.PullCallback实例化代码逻辑
通过匿名内部类的方式初始化了PullCallback回调接口,需要实现其OnSuccess、onException方法。
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),
pullResult,
subscriptionData);
如果拉取结果不为空,表明拉取成功了,执行processPullResult对拉取结果进行解析。
// 判断拉取状态
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
如果拉取结果响应中包含的消息列表为空,或者列表为空列表,则立即发起下一次拉取请求。以便唤醒PullMessageService再次执行拉取;之所以列表为空,是因为客户端通过TAG对消息进行了过滤,因此会出现过滤后列表为空的情况。
} else {
// 取出第一个消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
将拉取到的消息保存到processQueue中,通过submitConsumeRequest方法将拉取到的消息提交给ConsumeMessageService进行消息消费,这里是一个异步方法。
PullCallBack将消息提交给consumeMessageService之后就直接返回了,不关心消息具体是如何消费的。
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
这里的逻辑比较重要,实现了消息的持续拉取。具体的逻辑为:
将消息提交给消费者线程之后,PullCallBack立即返回,表明当前消息拉取已经完成。
判断PullInterval参数,如果PullInterval>0,等待PullInterval毫秒之后将PullRequest对象放到PullMessageService的pullRequestQueue消息拉取队列中。
pullRequestQueue的下次拉取被激活,从而达到消息持续拉取的目的,拉取的频率几乎是准实时的。
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
如果返回的拉取结果为NO_NEW_MSG、NO_MATCHED_MSG,则使用服务端校准的offset发起下一次拉取请求。
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
如果返回的拉取结果状态为OFFSET_ILLEGAL,即offset非法;首先设置ProcessQueue的Dropped为true,将该消息队列标记为丢弃。通过服务端下一次校准的offset尝试对当前消息的消费进度进行更新。
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
持久化当前消息的消费进度
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
将当前消息队列从rebalanceImpl的ProcessQueue中移除,对当前队列的消息拉取进行暂停处理,等待下一次rebalance。
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
offset校准时,基本上使用原先的offset。
客户端进行消费进度,只有实际消费进度大于当前消费进度会会进行offset的覆盖操作,从而保证offset的准确性。
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
如果拉取结果异常,则3s后将消息拉取请求重新将PullRequest对象放到PullMessageService的pullRequestQueue消息拉取队列中。
小结
本文主要对PullMessageService消息拉取逻辑进行了分析,整个流程还是比较复杂的。其中的一些编码套路在实战中也是能够借鉴的,希望本文能够对读者理解消息拉取有所帮助。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。