跟我学RocketMQ之消息轨迹实战与源码分析
RocketMQ在4.4.0 Release版本中支持了消息轨迹特性。
这一特性的增加能够让我们对消息从生产到存储到消费这一整个流程有一个清晰的掌握,配合上console1.0.1以上版本的图形化界面,对于错误排查及日常运维是一个很有用处的feature。
话不多说,我们先通过实战对消息轨迹特性做一次直观的了解,再从源码角度深入分析一下RocketMQ是如何实现消息轨迹这一能力的。
消息轨迹实战
消息轨迹特性在4.4.0时已经支持,本次实战使用最新的4.5.1Release,读者可以自行选择喜欢的版本进行测试。
关于如何搭建并启动RocketMQ集群在本文中不作为重点讲解,感兴趣的同学可以移步 跟我学RocketMQ[1-1]之安装RocketMQ。
消息轨迹–broker
RocketMQ对于消息轨迹特性是支持可配置的,如果我们要开启消息轨迹能力,需要编辑broker的配置文件broker.conf,在其中添加如下配置:
traceTopicEnable=true
我们也能够指定自定义的traceTopic名称,具体配置项如下:
msgTraceTopicName=SNOWALKER_TRACE_TOPIC
启动broker会调用相关逻辑自动创建轨迹Topic,如果msgTraceTopicName没有配置则会使用默认值:RMQ_SYS_TRACE_TOPIC 。
通过Topic实现各种特性是RocketMQ设计精妙之处,定时消息、事务消息、消息重试,包括我们今天接触到的消息轨迹都是这种思想的体现。至于它们具体是如何实现的,我们在文章的后半段的源码分析部分详细展开。
消息轨迹–生产者
我们接着看下生产者的初始化代码。
@Component
public class TraceProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(TraceProducer.class);
@Value("${rocketmq.nameServer}")
String nameSrvAddr;
private DefaultMQProducer defaultMQProducer;
@PostConstruct
public void init() {
defaultMQProducer =
new DefaultMQProducer
("GID_TRACE", true);
defaultMQProducer.setNamesrvAddr(nameSrvAddr);
// 发送失败重试次数
defaultMQProducer.setRetryTimesWhenSendFailed(3);
try {
defaultMQProducer.start();
} catch (MQClientException e) {
throw new RuntimeException("TraceProducer启动异常!", e);
}
LOGGER.info("TraceProducer启动完成!");
}
public DefaultMQProducer getProducer() {
return defaultMQProducer;
}
}
直观的看,基本看不出和之前的版本的区别。变化当然是有的,核心的代码如下:
defaultMQProducer = new DefaultMQProducer("GID_TRACE", true);
源码中的方法声明如下:
/**
* Constructor specifying producer group and enabled msg trace flag.
*
* @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
this(null, producerGroup, null, enableMsgTrace, null);
}
除了需要在构造初始化时传入生产者组名,还需要传入一个是否开启消息轨迹的boolean标识。其中,true 标识启用消息轨迹,false 标识不启用消息轨迹能力。
消息发送没有变化,依旧是组装Message消息体,并通过defaultMQProducer的send方法提交给broker。代码如下:
Message message = new Message("GID_TRACE_TOPIC", msgBody.getBytes());
SendResult sendResult = mqProducer.send(message);
到此生产者部分的讲解就告一段落,我们看下消费者是如何开启消息轨迹特性的。
消息轨迹–消费者
与生产者类似,消费者也是在构造初始化的时候通过 boolean enableMsgTrace 参数决定是否开启消息轨迹能力。我们看下代码
@Component
public class TraceConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(TraceConsumer.class);
@Value("${rocketmq.nameServer}")
String nameSrvAddr;
private DefaultMQPushConsumer defaultMQPushConsumer;
@Resource(name = "TraceConsumerListenerImpl")
private MessageListenerConcurrently messageListener;
@PostConstruct
public void init() {
defaultMQPushConsumer =
new DefaultMQPushConsumer("GID_TRACE", true);
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
// 从头开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式:集群模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
defaultMQPushConsumer.registerMessageListener(messageListener);
// 订阅所有消息
try {
defaultMQPushConsumer.subscribe("GID_TRACE_TOPIC", "*");
// 启动消费者
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException("TraceConsumer加载异常!", e);
}
LOGGER.info("TraceConsumer加载完成!");
}
}
我们来简单看下带消息轨迹标识的构造方法。
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
}
参数enableMsgTrace==true,表示消费端开启消息轨迹支持。
console查看轨迹
我们启动测试程序并发送一条消息,启动console,在消息轨迹选项卡下输入待查询的topic及消息id(MsgId),可以看到具体的消息轨迹明细,如下图:
从图中我们可以看出,消息id=AC1E5338623018B4AAC212BF58580000的消息,
- 在2019-07-04 15:22:09这个时间点发送到broker,耗时485ms,状态为Pub(发布)。
- 在2019-07-04 15:22:10这个时间点broker落盘
- 在2019-07-04 15:22:11这个时间点开始推送到消费者端,状态为SubBefore
- 经过97ms,消息消费完成
总的来说一共有三个状态,分别为:Pub、SubBefore、SubAfter。
到此我们对消息轨迹的使用就有了一个较为直观的认识。我们趁热打铁,通过阅读源码了解下消息轨迹特性是如何实现的。
消息轨迹源码分析
同实战部分类似,我们同样从broker服务端、生产者端、消费者端的代码分别进行讲解。
broker端实现
首先进入broker的启动入口 org.apache.rocketmq.broker.BrokerStartup.java 中,查看其main方法
public static void main(String[] args) {
start(createBrokerController(args));
}
查看start方法
public static BrokerController start(BrokerController controller) {
try {
controller.start();
...省略日志打印...
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
BrokerController在构造初始化时,会初始化TopicConfigManager实例。
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
...省略其他初始化逻辑...
{
// 获取broker配置,如果配置项traceTopicEnable=true
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
}
BrokerController启动后,会初始化TopicConfigManager实现trace topic的创建工作。
消息生产者发送逻辑
我们接着分析一下生产者初始化逻辑。
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
// 初始化异步Trace分发器
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
// set注入生产者实例
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
// 为消息轨迹注册hook,在消息发送前执行
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
我们主要看下SendMessageTraceHookImpl这个类,它实现了接口SendMessageHook,在消息发送前后调用,为消息添加Trace相关属性。
即Producer启动时注册钩子,该钩子持有负责消息发送的AsyncTraceDispatcher实例,消息发送后进而发送消息轨迹
AsyncTraceDispatcher主要用来做trace消息的发送工作。我们看下它的源码
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
......
/**
* snowalker
* 如果traceTopicName没传则使用默认TraceTopic=RMQ_SYS_TRACE_TOPIC
*/
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
可以看到,它首先判断一下消息轨迹topic名是否存在,如果存在则使用该topic名称,否则使用默认名称即:RMQ_SYS_TRACE_TOPIC
/**
* @param rpcHook
* @return
*/
private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook);
// 设置Trace生产者为_INNER_TRACE_PRODUCER
traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
// 发送5秒超时
traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K
traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
}
return traceProducerInstance;
}
getAndCreateTraceProducer方法初始化了trace消息发送生产者,如果不存在则创建一个新的实例,生产者组名为 _INNER_TRACE_PRODUCER,并设置发送超时时间为5秒。最大的消息体容量为128K。
Trace消息发送通过Hook方式进行发送,接口SendMessageHook定义了发送前后的hook方法,代码如下
public interface SendMessageHook {
String hookName();
void sendMessageBefore(final SendMessageContext context);
void sendMessageAfter(final SendMessageContext context);
}
//如果有hook在消息发送前执行,消息轨迹通过这种方式记录
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this); //发送对象
context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //生产组
context.setCommunicationMode(communicationMode); //发送模式
context.setBornHost(this.defaultMQProducer.getClientIP()); //客户端IP
context.setBrokerAddr(brokerAddr); //发往broker的地址
context.setMessage(msg); //消息内容
context.setMq(mq); //消息 Queue
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context); //执行自定义个hook业务
}
接着看下executeSendMessageHookBefore方法的逻辑
public void executeSendMessageHookBefore(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
hook.sendMessageBefore(context);
} catch (Throwable e) {
log.warn("failed to executeSendMessageHookBefore", e);
}
}
}
}
可以看到在此处调用了sendMessageBefore方法,也就是SendMessageTraceHookImpl.sendMessageBefore方法,对消息trace进行包装,代码如下:
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
此处设置trace类型为 TraceType.Pub 。
消息消费者处理逻辑
我们接着分析一下消费端消息轨迹是如何实现的。
初始化方式与消息生产端类似,先看下构造方法声明。
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
}
同样是在构造中传入是否启用消息轨迹的标识,enableMsgTrace=true,实际调用了以下的构造
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
如果开启了消息轨迹,则初始化AsyncTraceDispatcher,并注册消费hook–ConsumeMessageTraceHookImpl。
ConsumeMessageTraceHookImpl实现了ConsumeMessageHook,它会在消费的前后被回调。我们看下ConsumeMessageTraceHookImpl的逻辑。
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
...省略部分代码...
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext traceContext = new TraceContext();
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.SubBefore);//
traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
if (traceOn != null && traceOn.equals("false")) {
// If trace switch is false ,skip it
continue;
}
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());//
traceBean.setKeys(msg.getKeys());//
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
if (beans.size() > 0) {
traceContext.setTraceBeans(beans);
traceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(traceContext);
}
}
consumeMessageBefore方法在消费之前回调,设置trace类型为TraceType.SubBefore,设置trace相关属性,并设置当前时间戳。
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// If subbefore bean is null ,skip it
return;
}
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);//
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
// Caculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}
localDispatcher.append(subAfterContext);
}
}
consumeMessageAfter在消费之后被回调,设置trace类型为TraceType.SubAfter。
我们看下ConsumeMessageTraceHookImpl被调用的位置,调用ConsumeMessageTraceHookImpl的类为ConsumeMessageConcurrentlyService.ConsumeRequest。
核心逻辑在run方法中,我们看下具体的代码段。
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
判断消费者是否有钩子,如果有则调用钩子前置方法executeHookBefore,传入消费上下文,对trace进行前置处理。接着执行消息消费逻辑,这部分代码就省略了,感兴趣的同学可以自行查看 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()
接着看一下钩子的后置方法被调用的代码逻辑。
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
判断当前消费者是否具有钩子,由于我们在消费者客户端构造的时候已经注册了钩子ConsumeMessageTraceHookImpl,因此会进入if代码块,设置当前消费状态为消费成功,并同步执行该消费者客户端的后置钩子方法executeHookAfter,对trace进行后置加工。
到此就基本完成了整个trace链路的代码逻辑分析,有不足之处欢迎与作者讨论。
小结
本文中,我们通过实例演示对RocektMQ4.4.0新特性–消息轨迹进行了实战展示,并通过阅读源码对消息轨迹特性的实现进行了较为深入的分析。
通过阅读源码能够让我们对RocketMQ如何实现新特性的扩展有所认识,每次阅读源码都能够使我们对框架本身有更为深入的了解,并将学到的套路运用到实践中,实现自我能力的提高与认知的升级。
更多的源码分析文章,敬请期待。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。