文章目录
  1. 1. PullMessageService启动
  2. 2. PullMessageService启动
  3. 3. PullMessageService消息拉取流程分析
  4. 4. PullRequest添加流程
    1. 4.1. PullRequest简介
  5. 5. PullRequest消息拉取流程
  6. 6. DefaultMQPushConsumerImpl.pullMessage消息拉取逻辑
  7. 7. DefaultMQPushConsumerImpl.pullMessage.PullCallback实例化代码逻辑
  8. 8. 小结

我们继续对消息消费流程的源码进行解析。

本文主要针对push模式下的消息拉取流程进行解析。我们重点分析集群消费模式,对于广播模式其实很好理解,每个消费者都需要拉取主题下面的所有消费队列的消息。

在集群消费模式下,同一个消费者组内包含了多个消费者实例,同一个topic下存在多个消费队列。对于单个消费者组,其内部维护了一个线程池进行消息消费,这部分内容可以移步 跟我学RocketMQ之消息消费源码解析(2)

之前我们已经研究了消费者的初始化流程,在启动MQClientInstance过程中,启动了一个消息拉取线程PullMessageService进行消息拉取工作。

PullMessageService启动

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

    ...省略其他...
    this.pullMessageService = new PullMessageService(this);
    ...省略其他...
    this.rebalanceService = new RebalanceService(this);
}

可以看到在MQClientInstance的构造初始化过程中,启动了PullMessageService线程。

PullMessageService启动

从之前文章中对消息消费启动过程的分析中得知,在消费者启动过程defaultMQPushConsumerImpl.start()中,我们启动了MQClientInstance。

在启动MQClientInstance的过程中,对消息拉取线程进行了start()。消息拉取线程开始运行,看下代码实现

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // 启动消息拉取线程
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

通过 this.pullMessageService.start() 启动了消息拉取线程。

PullMessageService消息拉取流程分析

PullMessageService是ServiceThread的子类,ServiceThread是RocketMQ实现的具备启停能力的线程实现,它实现了Runnable接口。我们看一下PullMessageService的声明。

public class PullMessageService extends ServiceThread {

当PullMessageService启动后,开始运行run方法,我们看一下run方法逻辑。

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // step 1
            PullRequest pullRequest = this.pullRequestQueue.take();
            // step 2
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

while (!this.isStopped()) { 这个写法是一种通用的设计技巧,stopped是一个声明为volatile的boolean类型变量,保证多线程下的可见性;每次执行逻辑时判断stopped是否为false,如果是则执行循环体内逻辑。

其他线程能够通过设置stopped为true,导致此处判断结果为false从而终止拉取线程的运行。

  • [step 1] 从pullRequestQueue(LinkedBlockingQueue无界队列)中通过take()获取一个PullRequest消息拉取任务;如果队列为空,则线程阻塞,等待新的PullRequest被放入恢复运行。
  • [step 2] 执行pullMessage方法进行真正的消息拉取操作。

PullRequest添加流程

在阅读pullMessage逻辑之前,我们先看一下PullRequest是从何添加的。

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    if (!isStopped()) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    } else {
        log.warn("PullMessageServiceScheduledThread has shutdown");
    }
}

public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

PullMessageService提供了即时添加与延时添加两种方式添加PullRequest,将其加入到pullRequestQueue阻塞队列中。PullRequest的创建过程是在RebalanceImpl中完成的,这个过程涉及到RocketMQ消息消费的重要过程 消息队列负载机制,这个过程我们会单独进行讲解。

PullRequest简介

我们简单看一下PullRequest的结构:

public class PullRequest {
    // 消费者组
    private String consumerGroup;
    // 待拉取到消息队列
    private MessageQueue messageQueue;
    // 消息处理队列,消息从broker中拉取以后会先存到该ProcessQueue中,然后再提交给消费者线程池进行消费
    private ProcessQueue processQueue;
    // 带拉取消息的偏移量
    private long nextOffset;
    // 是否锁定
    private boolean lockedFirst = false;

PullRequest消息拉取流程

我们继续回到PullRequest拉取流程中来,查看PullMessageService.pullMessage方法。

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

这是一个私有方法,可以看到

  1. 首先从MQClientInstance中选择一个消费者,选取条件为当前拉取请求中的消费者组;
  2. 将该消费者实例强转为DefaultMQPushConsumerImpl
  3. 调用DefaultMQPushConsumerImpl的pullMessage方法进行消息拉取。

我们接着进入DefaultMQPushConsumerImpl.java中查看其pullMessage的具体实现。

DefaultMQPushConsumerImpl.pullMessage消息拉取逻辑

前方大段代码预警……我会将这个大方法拆分成一段一段的细分逻辑进行分析。

public void pullMessage(final PullRequest pullRequest) {
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    if (processQueue.isDropped()) {
        log.info("the pull request[{}] is dropped.", pullRequest.toString());
        return;
    }

    pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

首先从pullRequest请求中获取到处理队列processQueue,如果processQueue已经被丢弃则结束拉取流程;

如果processQueue未被丢弃,则更新LastPullTimestamp属性未当前时间戳。

try {
    this.makeSureStateOK();
} catch (MQClientException e) {
    log.warn("pullMessage exception, consumer state not ok", e);
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    return;
}

判断当前线程状态是否为运行态,makeSureStateOK()方法会通过 this.serviceState != ServiceState.RUNNING 进行服务状态的判断; 如果不是RUNNING状态则抛出异常结束消息拉取流程。

if (this.isPause()) {
    ...省略warn日志...
    // long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;单位毫秒
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
    return;
}

对消费者是否挂起进行判断,如果消费者状态为已挂起,则将拉取请求延迟1s后再次放到PullMessageService的消息拉取任务队列中。

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        ...省略warn日志...
    }
    return;
}

此处进行消息拉取流控校验:

如果processQueue当前处理的消息条数超过了PullThresholdForQueue(消息拉取阈值=1000)触发流控,结束本次拉取任务,50毫秒之后将该拉取任务再次加入到消息拉取任务队列中。每触发1000次流控,打印warn日志;

if (!this.consumeOrderly) {
    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
            ...省略warn日志...
        }
        return;
    }

如果不是顺序消费,判断processQueue中队列最大偏移量与最小偏移量之间的间隔,如果大于ConsumeConcurrentlyMaxSpan(拉取偏移量阈值==2000)触发流控,结束本次拉取;50毫秒之后将该拉取任务再次加入到消息拉取任务队列中。

每触发1000次流程,打印warn日志。

} else {
    if (processQueue.isLocked()) {
        if (!pullRequest.isLockedFirst()) {
            final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
            boolean brokerBusy = offset < pullRequest.getNextOffset();
            log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                pullRequest, offset, brokerBusy);
            if (brokerBusy) {
                log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                    pullRequest, offset);
            }

            pullRequest.setLockedFirst(true);
            pullRequest.setNextOffset(offset);
        }
    } else {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        log.info("pull message later because not locked in broker, {}", pullRequest);
        return;
    }
}

如果消息处理队列锁定成功,判断消息拉取请求是否锁定,如果没有锁定则计算从何处开始拉取。

判断broker是否繁忙,如果当前拉取的进度小于拉取请求中要拉取到下一个进度,表明当前broker处理的拉取请求还没有执行完成,因此brokerBusy为true,表示broker处于繁忙状态。

更新拉取请求的锁定标记为已锁定,更新下一次拉取的offset为计算出的offset。

如果消息处理队列未锁定,则延迟3s之后将将该拉取任务再次加入到消息拉取任务队列。打印日志表明稍后再进行消息拉取,原因为broker未被锁定。结束本次拉取。

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}

获取当前topic的订阅消息,如果订阅消息不存在,则结束当前消息拉取;延迟三秒之后将拉取任务再次加入到消息拉取任务队列中。

final long beginTimestamp = System.currentTimeMillis();

...省略pullCallback实现...

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue > 0) {
        commitOffsetEnable = true;
    }
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
    if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
        subExpression = sd.getSubString();
    }

    classFilter = sd.isClassFilterMode();
}

获取消息订阅信息,如果订阅信息存在则获取tag标识。

int sysFlag = PullSysFlag.buildSysFlag(
    commitOffsetEnable, // commitOffset
    true, // suspend
    subExpression != null, // subscription
    classFilter // class filter
);

这里主要构造了消息拉取的系统标识;

try {
    this.pullAPIWrapper.pullKernelImpl(
        pullRequest.getMessageQueue(),
        subExpression,
        subscriptionData.getExpressionType(),
        subscriptionData.getSubVersion(),
        pullRequest.getNextOffset(),
        this.defaultMQPushConsumer.getPullBatchSize(),
        sysFlag,
        commitOffsetValue,
        BROKER_SUSPEND_MAX_TIME_MILLIS,
        CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
        CommunicationMode.ASYNC,
        pullCallback
    );

通过pullKernelImpl()方法发起真实的消息拉取请求,pullKernelImpl方法内部与服务端进行网络通信。底层调用了MQClientAPIImpl的pullMessage方法。此处涉及到网络通信,我们在后续的网络通讯代码部分进行分析。

注意此处的pullKernelImpl方法中的最后一个参数为PullCallback,PullCallback为从Broker拉取消息之后的回调方法,它的初始化代码如下,我们单独拿出来进行解析。

DefaultMQPushConsumerImpl.pullMessage.PullCallback实例化代码逻辑

通过匿名内部类的方式初始化了PullCallback回调接口,需要实现其OnSuccess、onException方法。

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), 
                            pullResult,
                            subscriptionData);

如果拉取结果不为空,表明拉取成功了,执行processPullResult对拉取结果进行解析。

// 判断拉取状态
switch (pullResult.getPullStatus()) {
    case FOUND:
        long prevRequestOffset = pullRequest.getNextOffset();
        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
        long pullRT = System.currentTimeMillis() - beginTimestamp;
        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
            pullRequest.getMessageQueue().getTopic(), pullRT);

        long firstMsgOffset = Long.MAX_VALUE;
        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

如果拉取结果响应中包含的消息列表为空,或者列表为空列表,则立即发起下一次拉取请求。以便唤醒PullMessageService再次执行拉取;之所以列表为空,是因为客户端通过TAG对消息进行了过滤,因此会出现过滤后列表为空的情况。

} else {
    // 取出第一个消息的offset
    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
        pullResult.getMsgFoundList(),
        processQueue,
        pullRequest.getMessageQueue(),
        dispatchToConsume);

将拉取到的消息保存到processQueue中,通过submitConsumeRequest方法将拉取到的消息提交给ConsumeMessageService进行消息消费,这里是一个异步方法。

PullCallBack将消息提交给consumeMessageService之后就直接返回了,不关心消息具体是如何消费的。

    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    } else {
        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    }
}

这里的逻辑比较重要,实现了消息的持续拉取。具体的逻辑为:

将消息提交给消费者线程之后,PullCallBack立即返回,表明当前消息拉取已经完成。

判断PullInterval参数,如果PullInterval>0,等待PullInterval毫秒之后将PullRequest对象放到PullMessageService的pullRequestQueue消息拉取队列中。

pullRequestQueue的下次拉取被激活,从而达到消息持续拉取的目的,拉取的频率几乎是准实时的。

    if (pullResult.getNextBeginOffset() < prevRequestOffset
        || firstMsgOffset < prevRequestOffset) {
        log.warn(
            "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
            pullResult.getNextBeginOffset(),
            firstMsgOffset,
            prevRequestOffset);
    }

    break;
case NO_NEW_MSG:
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    break;
case NO_MATCHED_MSG:
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    break;

如果返回的拉取结果为NO_NEW_MSG、NO_MATCHED_MSG,则使用服务端校准的offset发起下一次拉取请求。

case OFFSET_ILLEGAL:
    log.warn("the pull request offset illegal, {} {}",
        pullRequest.toString(), pullResult.toString());
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

    pullRequest.getProcessQueue().setDropped(true);
    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

如果返回的拉取结果状态为OFFSET_ILLEGAL,即offset非法;首先设置ProcessQueue的Dropped为true,将该消息队列标记为丢弃。通过服务端下一次校准的offset尝试对当前消息的消费进度进行更新。

@Override
public void run() {
    try {
        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
            pullRequest.getNextOffset(), false);

持久化当前消息的消费进度

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

将当前消息队列从rebalanceImpl的ProcessQueue中移除,对当前队列的消息拉取进行暂停处理,等待下一次rebalance。

                            log.warn("fix the pull request offset, {}", pullRequest);
                        } catch (Throwable e) {
                            log.error("executeTaskLater Exception", e);
                        }
                    }
                }, 10000);
                break;
            default:
                break;
        }
    }
}

offset校准时,基本上使用原先的offset。

客户端进行消费进度,只有实际消费进度大于当前消费进度会会进行offset的覆盖操作,从而保证offset的准确性。

    @Override
    public void onException(Throwable e) {
        if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            log.warn("execute the pull request exception", e);
        }

        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    }
};

如果拉取结果异常,则3s后将消息拉取请求重新将PullRequest对象放到PullMessageService的pullRequestQueue消息拉取队列中。

小结

本文主要对PullMessageService消息拉取逻辑进行了分析,整个流程还是比较复杂的。其中的一些编码套路在实战中也是能够借鉴的,希望本文能够对读者理解消息拉取有所帮助。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. PullMessageService启动
  2. 2. PullMessageService启动
  3. 3. PullMessageService消息拉取流程分析
  4. 4. PullRequest添加流程
    1. 4.1. PullRequest简介
  5. 5. PullRequest消息拉取流程
  6. 6. DefaultMQPushConsumerImpl.pullMessage消息拉取逻辑
  7. 7. DefaultMQPushConsumerImpl.pullMessage.PullCallback实例化代码逻辑
  8. 8. 小结
Fork me on GitHub