跟我学RocketMQ之定时消息源码解析
本文我们单独对RocketMQ的定时消息进行源码解析。
同事务消息类似,RocketMQ定时消息也是通过Topic替换,后台线程异步发送实现的。具体逻辑是通过org.apache.rocketmq.store.schedule.ScheduleMessageService实现的。
定时消息原理概述
在正式进行源码分析之前,我们先从概念上对定时消息做一个较为宏观的认知。
RocketMQ支持指定级别的消息延迟,默认为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
RocketMQ消息重试以及定时消息均是通过定时任务实现的。重试消息以及定时消息在存入commitLog之前会判断重试次数,如果大于0,则会将消息的topic设置为SCHEDULE_TOPIC_XXXX。
ScheduleMessageService在实例化之后会对SCHEDULE_TOPIC_XXXX主题下的消息进行定时调度,从而实现定时投递。
ScheduleMessageService源码解析
我们接着对ScheduleMessageService进行解析,了解RocketMQ具体是如何实现定时消息机制的。
重要变量
在正式分析之前,先对ScheduleMessageService的重要成员变量做一下了解:
delayLevelTable,记录了对延迟级别的解析结果,key=延迟级别,value=对应延迟级别的毫秒数
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
offsetTable,延迟级别对应的消费进度,key=延迟级别,value=对应延迟级别下的消费进度
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
初始化
ScheduleMessageService的初始化是在DefaultMessageStore实现的,具体的调用链如下:
BrokerStartup
|-main
|-start
|-createBrokerController
|-BrokerController.initialize()
|-controller.start()
|-DefaultMessageStore.start()
|-new ScheduleMessageService(this)
|-scheduleMessageService.start()
从调用链可以看出,当broker启动完成,ScheduleMessageService就开始对定时消息进行调度。
对于ScheduleMessageService我们主要关注:
- load()方法
- start()方法
ScheduleMessageService.load()
首先关注一下load()方法逻辑。
[ScheduleMessageService.java]
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
return result;
}
load()方法的逻辑比较清晰,它的主要职责为:
- 通过super.load()方法获取配置文件,加载延迟消息的消费进度
- 初始化delayLevelTable
RocketMQ将延时消息的消费进度存储于 ${RocketMQ_Home}/store/config/delayOffset.json下。
我们重点看一下parseDelayLevel();如何完成解析延时配置,并组装为delayLevelTable的。
[ScheduleMessageService.java]
public boolean parseDelayLevel() {
// 初始化一个时间单位map,key为秒、分、时、天;value为对应单位的毫秒数
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
// 从defaultMessageStore中获取配置文件,从配置文件中获取延迟级别配置串,即:messageDelayLevel
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
// 根据空格进行拆分,分解为String数组
String[] levelArray = levelString.split(" ");
// 遍历String数组
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
// key=延迟级别,等于下标+1
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
// value=单位对应毫秒数 * 解析得到的时间单位
long delayTimeMillis = tu * num;
// 存放到delayLevelTable
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
这段代码很好理解,就是对配置中的延时串通过空格进行分割为数组,按照下标及单位,计算得到每个等级对应的毫秒数,最终存放在delayLevelTable中实现delayLevelTable的初始化,便于后续在代码逻辑中进行使用。
如果没有设置则使用代码中的默认值。
ScheduleMessageService.start()
我们接着看一下start()方法的逻辑,该方法是延迟消息(定时消息)调度的核心逻辑。
[ScheduleMessageService.java]
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
start方法的核心思想为
对不同的延迟级别创建对应的定时任务,通过定时任务对持久化的消息队列的进度进行存储。
// 首先对delayLevelTable进行迭代,取出每一个级别及其对应的延时长度。
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
// 获取该级别对应的消费进度offset,如果不存在则设置为0
if (null == offset) {
offset = 0L;
}
// 如果延时不为空,则延迟1秒执行定时任务
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
这里简单总结一下,首先对delayLevelTable进行遍历,获取对应延迟级别level对应的消费进度,默认进度不存在,每个延迟级别对应的消费进度都从0开始。
创建定时任务开始进行调度,每个定时任务初始都延迟1秒开始进行调度。后续则使用对应的延迟级别进行调度。
注意:延时级别与消费队列的关系为:消息队列id=延时级别-1,具体逻辑在queueId2DelayLevel方法中。
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
这段代码的核心逻辑为,执行定时任务,每隔10s进行一次消费进度的持久化操作。具体的持久化刷盘频率可以通过flushDelayOffsetInterval参数进行配置。
定时任务实现:DeliverDelayedMessageTimerTask
上面的分析中我们得知,RocketMQ对定时消息的每一个延迟级别都设置了一个定时任务,这个定时任务识通过DeliverDelayedMessageTimerTask实现的。
DeliverDelayedMessageTimerTask继承了TimerTask,我们直接看它的run()方法实现。
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
可以看到,核心是executeOnTimeup()方法,当执行异常,延迟10s后继续执行调度。
我们进入executeOnTimeup()方法。
executeOnTimeup()
首先根据topic=SCHEDULE_TOPIC_XXXX,延迟级别转换为队列id,查询到当前的消费队列。
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
根据当前的offset从消费队列中获取当前所有的有效消息,如果未能获取到则更新拉取进度,等待定时任务下次进行尝试。
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
定时任务每次执行到这里都进行时间比较,计算延迟时间与当前时间的差值,如果延迟时间-当前时间<=0说明该延迟消息应当被处理,使其能够被消费者消费。
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
根据消息偏移量及消息大小从commitLog中查询消息,如果查到,则开始执行正式的消息消费准备工作。
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.
this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
对消息执行重新存储操作,恢复原先的队列以及消息topic,再将消息重新持久化到commitLog中,此时的消息已经能够被消费者拉取到。
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
我们重点看一下messageTimeup(msgExt)方法是如何进行消息的恢复操作。
messageTimeup(msgExt)恢复原消息主题及队列
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
// 建立一个新的MessageExtBrokerInner实体
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
...省略属性设置...
msgInner.setWaitStoreMsgOK(false);
// 清理消息延迟级别属性
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
// 恢复消息原主题
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
// 恢复消息原队列id
String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
int queueId = Integer.parseInt(queueIdStr);
msgInner.setQueueId(queueId);
return msgInner;
}
经过上述操作,定时消息已经还原为普通消息。
我们继续回到 executeOnTimeup() 方法中,通过
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
将还原后的消息重新持久化到commitLog中。
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
更新当前延迟队列的消息拉取进度,继续处理后续的消息。
小结
本文我们完整的对RocketMQ的定时消息实现方式进行了分析,我们总结一下它的完整流程:
- 消息发送方发送消息,设置delayLevel。
- 如果delayLevel大于0,表明是一条延时消息,broker处理该消息,将消息的主题、队列id进行备份后,改变消息的主题为SCHEDULE_TOPIC_XXXX,队列id=延迟级别-1,将消息持久化。
- 通过定时任务ScheduleMessageService对定时消息进行处理,每隔1s从上次拉取偏移量取出所有的消息进行处理
- 从消费队列中解析出消息的物理偏移量,从而从commitLog中取出消息
- 根据消息的属性重建消息,恢复消息的topic、原队列id,将消息的延迟级别属性delayLevel清除掉,再次保存到commitLog中
- 将消息转发到原主题对应的消费队列中,此时消费者可以对该消息进行消费。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。