文章目录
  1. 1. 定时消息原理概述
  2. 2. ScheduleMessageService源码解析
    1. 2.1. 重要变量
    2. 2.2. 初始化
    3. 2.3. ScheduleMessageService.load()
    4. 2.4. ScheduleMessageService.start()
    5. 2.5. 定时任务实现:DeliverDelayedMessageTimerTask
      1. 2.5.1. executeOnTimeup()
      2. 2.5.2. messageTimeup(msgExt)恢复原消息主题及队列
    6. 2.6. 小结

本文我们单独对RocketMQ的定时消息进行源码解析。

同事务消息类似,RocketMQ定时消息也是通过Topic替换,后台线程异步发送实现的。具体逻辑是通过org.apache.rocketmq.store.schedule.ScheduleMessageService实现的。

定时消息原理概述

在正式进行源码分析之前,我们先从概念上对定时消息做一个较为宏观的认知。

RocketMQ支持指定级别的消息延迟,默认为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

RocketMQ消息重试以及定时消息均是通过定时任务实现的。重试消息以及定时消息在存入commitLog之前会判断重试次数,如果大于0,则会将消息的topic设置为SCHEDULE_TOPIC_XXXX。

ScheduleMessageService在实例化之后会对SCHEDULE_TOPIC_XXXX主题下的消息进行定时调度,从而实现定时投递。

ScheduleMessageService源码解析

我们接着对ScheduleMessageService进行解析,了解RocketMQ具体是如何实现定时消息机制的。

重要变量

在正式分析之前,先对ScheduleMessageService的重要成员变量做一下了解:

delayLevelTable,记录了对延迟级别的解析结果,key=延迟级别,value=对应延迟级别的毫秒数

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
    new ConcurrentHashMap<Integer, Long>(32);

offsetTable,延迟级别对应的消费进度,key=延迟级别,value=对应延迟级别下的消费进度

private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
    new ConcurrentHashMap<Integer, Long>(32);

初始化

ScheduleMessageService的初始化是在DefaultMessageStore实现的,具体的调用链如下:

BrokerStartup
    |-main
        |-start
            |-createBrokerController
                |-BrokerController.initialize()    
                |-controller.start()
                    |-DefaultMessageStore.start()
                        |-new ScheduleMessageService(this)
                        |-scheduleMessageService.start()

从调用链可以看出,当broker启动完成,ScheduleMessageService就开始对定时消息进行调度。

对于ScheduleMessageService我们主要关注:

  • load()方法
  • start()方法

ScheduleMessageService.load()

首先关注一下load()方法逻辑。

[ScheduleMessageService.java]
public boolean load() {
    boolean result = super.load();
    result = result && this.parseDelayLevel();
    return result;
}

load()方法的逻辑比较清晰,它的主要职责为:

  1. 通过super.load()方法获取配置文件,加载延迟消息的消费进度
  2. 初始化delayLevelTable

RocketMQ将延时消息的消费进度存储于 ${RocketMQ_Home}/store/config/delayOffset.json下。

我们重点看一下parseDelayLevel();如何完成解析延时配置,并组装为delayLevelTable的。

[ScheduleMessageService.java]
public boolean parseDelayLevel() {
    // 初始化一个时间单位map,key为秒、分、时、天;value为对应单位的毫秒数
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);

    // 从defaultMessageStore中获取配置文件,从配置文件中获取延迟级别配置串,即:messageDelayLevel
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {

        // 根据空格进行拆分,分解为String数组
        String[] levelArray = levelString.split(" ");

        // 遍历String数组
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            Long tu = timeUnitTable.get(ch);

            // key=延迟级别,等于下标+1
            int level = i + 1;
            if (level > this.maxDelayLevel) {
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            // value=单位对应毫秒数 * 解析得到的时间单位
            long delayTimeMillis = tu * num;
            // 存放到delayLevelTable
            this.delayLevelTable.put(level, delayTimeMillis);
        }
    } catch (Exception e) {
        log.error("parseDelayLevel exception", e);
        log.info("levelString String = {}", levelString);
        return false;
    }
    return true;
}

这段代码很好理解,就是对配置中的延时串通过空格进行分割为数组,按照下标及单位,计算得到每个等级对应的毫秒数,最终存放在delayLevelTable中实现delayLevelTable的初始化,便于后续在代码逻辑中进行使用。

如果没有设置则使用代码中的默认值。

ScheduleMessageService.start()

我们接着看一下start()方法的逻辑,该方法是延迟消息(定时消息)调度的核心逻辑。

[ScheduleMessageService.java]
public void start() {
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);

start方法的核心思想为

对不同的延迟级别创建对应的定时任务,通过定时任务对持久化的消息队列的进度进行存储。

// 首先对delayLevelTable进行迭代,取出每一个级别及其对应的延时长度。
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
    Integer level = entry.getKey();
    Long timeDelay = entry.getValue();
    Long offset = this.offsetTable.get(level);
    // 获取该级别对应的消费进度offset,如果不存在则设置为0
    if (null == offset) {
        offset = 0L;
    }

    // 如果延时不为空,则延迟1秒执行定时任务
    if (timeDelay != null) {
        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
    }
}

这里简单总结一下,首先对delayLevelTable进行遍历,获取对应延迟级别level对应的消费进度,默认进度不存在,每个延迟级别对应的消费进度都从0开始。

创建定时任务开始进行调度,每个定时任务初始都延迟1秒开始进行调度。后续则使用对应的延迟级别进行调度。

注意:延时级别与消费队列的关系为:消息队列id=延时级别-1,具体逻辑在queueId2DelayLevel方法中。

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

这段代码的核心逻辑为,执行定时任务,每隔10s进行一次消费进度的持久化操作。具体的持久化刷盘频率可以通过flushDelayOffsetInterval参数进行配置。

定时任务实现:DeliverDelayedMessageTimerTask

上面的分析中我们得知,RocketMQ对定时消息的每一个延迟级别都设置了一个定时任务,这个定时任务识通过DeliverDelayedMessageTimerTask实现的。

DeliverDelayedMessageTimerTask继承了TimerTask,我们直接看它的run()方法实现。

@Override
public void run() {
    try {
        if (isStarted()) {
            this.executeOnTimeup();
        }
    } catch (Exception e) {
        // XXX: warn and notify me
        log.error("ScheduleMessageService, executeOnTimeup exception", e);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
            this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    }
}

可以看到,核心是executeOnTimeup()方法,当执行异常,延迟10s后继续执行调度。

我们进入executeOnTimeup()方法。

executeOnTimeup()

首先根据topic=SCHEDULE_TOPIC_XXXX,延迟级别转换为队列id,查询到当前的消费队列。

ConsumeQueue cq =
    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
        delayLevel2QueueId(delayLevel));

根据当前的offset从消费队列中获取当前所有的有效消息,如果未能获取到则更新拉取进度,等待定时任务下次进行尝试。

for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    long offsetPy = bufferCQ.getByteBuffer().getLong();
    int sizePy = bufferCQ.getByteBuffer().getInt();
    long tagsCode = bufferCQ.getByteBuffer().getLong();

    if (cq.isExtAddr(tagsCode)) {
        if (cq.getExt(tagsCode, cqExtUnit)) {
            tagsCode = cqExtUnit.getTagsCode();
        } else {
            //can't find ext content.So re compute tags code.
            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                tagsCode, offsetPy, sizePy);
            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
        }
    }

定时任务每次执行到这里都进行时间比较,计算延迟时间与当前时间的差值,如果延迟时间-当前时间<=0说明该延迟消息应当被处理,使其能够被消费者消费。

long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

根据消息偏移量及消息大小从commitLog中查询消息,如果查到,则开始执行正式的消息消费准备工作。

if (countdown <= 0) {
    MessageExt msgExt =
         ScheduleMessageService.
            this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

对消息执行重新存储操作,恢复原先的队列以及消息topic,再将消息重新持久化到commitLog中,此时的消息已经能够被消费者拉取到。

if (msgExt != null) {
         try {
             MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

            PutMessageResult putMessageResult =
                ScheduleMessageService.this.writeMessageStore
                                    .putMessage(msgInner);

我们重点看一下messageTimeup(msgExt)方法是如何进行消息的恢复操作。

messageTimeup(msgExt)恢复原消息主题及队列

private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
        // 建立一个新的MessageExtBrokerInner实体
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());

        ...省略属性设置...

        msgInner.setWaitStoreMsgOK(false);
        // 清理消息延迟级别属性
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

        // 恢复消息原主题
        msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

        // 恢复消息原队列id
        String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
        int queueId = Integer.parseInt(queueIdStr);
        msgInner.setQueueId(queueId);

        return msgInner;
    }

经过上述操作,定时消息已经还原为普通消息。

我们继续回到 executeOnTimeup() 方法中,通过

PutMessageResult putMessageResult = 
ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);

将还原后的消息重新持久化到commitLog中。

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
        this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

更新当前延迟队列的消息拉取进度,继续处理后续的消息。

小结

本文我们完整的对RocketMQ的定时消息实现方式进行了分析,我们总结一下它的完整流程:

  1. 消息发送方发送消息,设置delayLevel。
  2. 如果delayLevel大于0,表明是一条延时消息,broker处理该消息,将消息的主题、队列id进行备份后,改变消息的主题为SCHEDULE_TOPIC_XXXX,队列id=延迟级别-1,将消息持久化。
  3. 通过定时任务ScheduleMessageService对定时消息进行处理,每隔1s从上次拉取偏移量取出所有的消息进行处理
  4. 从消费队列中解析出消息的物理偏移量,从而从commitLog中取出消息
  5. 根据消息的属性重建消息,恢复消息的topic、原队列id,将消息的延迟级别属性delayLevel清除掉,再次保存到commitLog中
  6. 将消息转发到原主题对应的消费队列中,此时消费者可以对该消息进行消费。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 定时消息原理概述
  2. 2. ScheduleMessageService源码解析
    1. 2.1. 重要变量
    2. 2.2. 初始化
    3. 2.3. ScheduleMessageService.load()
    4. 2.4. ScheduleMessageService.start()
    5. 2.5. 定时任务实现:DeliverDelayedMessageTimerTask
      1. 2.5.1. executeOnTimeup()
      2. 2.5.2. messageTimeup(msgExt)恢复原消息主题及队列
    6. 2.6. 小结
Fork me on GitHub