文章目录
  1. 1. DefaultMQPushConsumer使用样例
  2. 2. 初始化DefaultMQPushConsumer
  3. 3. 注册消费监听MessageListener
  4. 4. 订阅topic
  5. 5. 启动消费客户端
    1. 5.1. copySubscription(),消息重试topic处理逻辑
  6. 6. 小结

本文我们接着分析一下RocektMQ实现消息消费的源码细节,这部分的内容较多,因此拆分为几个章节分别进行讲解。

本章节重点讲解DefaultMQPushConsumer的代码逻辑。

DefaultMQPushConsumer使用样例

按照惯例还是先看一下DefaultMQPushConsumer的使用样例。

@PostConstruct
public void init() {
    defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
    defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
    // 从头开始消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式:集群模式
    defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册监听器
    defaultMQPushConsumer.registerMessageListener(messageListener);
    // 订阅所有消息
    try {
        defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
        defaultMQPushConsumer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e);
    }
    LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!");
}

初始化过程中需要调用registerMessageListener将具体的消费实现Listener注入。

@Component(value = "notifySendListenerImpl")
public class NotifySendListenerImpl implements MessageListenerConcurrently {
...省略部分代码...

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

    try {
        for (MessageExt msg : msgs) {
            // 消息解码
            String message = new String(msg.getBody());
            // 消费次数
            int reconsumeTimes = msg.getReconsumeTimes();
            String msgId = msg.getMsgId();
            String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;

            LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-接收到消息,message={},{}", message, logSuffix);
            // 请求组装
            OrderResultNofityProtocol protocol = new OrderResultNofityProtocol();
            protocol.decode(message);
            // 参数加签,获取用户privatekey
            String privateKey = protocol.getPrivateKey();
            String notifyUrl = protocol.getMerchantNotifyUrl();
            String purseId = protocol.getPurseId();
            ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest();
            chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId())
                    .setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis())))
                    .setOrder_status(NotifyConstant.NOTIFY_SUCCESS)
                    .setPlat_orderid(protocol.getOrderId())
                    .setSign(chargeNotifyRequest.sign(privateKey));
            LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-订单结果通知入参:{},{}", chargeNotifyRequest.toString(), logSuffix);
            // 通知发送
            return sendNotifyByPost(reconsumeTimes, logSuffix, protocol, notifyUrl, purseId, chargeNotifyRequest);
        }
    } catch (Exception e) {
        LOGGER.error("[通知发送消息消费者]消费异常,e={}", LogExceptionWapper.getStackTrace(e));
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

上面就是一个较为标准的在spring框架中使用RocektMQ的DefaultMQPushConsumer进行消费的主流程。

接下来我们重点分析一下源码实现。

初始化DefaultMQPushConsumer

首先看一下DefaultMQPushConsumer的初始化过程。

进入DefaultMQPushConsumer.java类,查看构造方法:

public DefaultMQPushConsumer(final String consumerGroup) {
    this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}

调用了它的同名构造,采用AllocateMessageQueueAveragely策略(平均散列队列算法

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
    this.consumerGroup = consumerGroup;
    this.namespace = namespace;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

可以看到实际初始化是通过DefaultMQPushConsumerImpl实现的,DefaultMQPushConsumer持有一个defaultMQPushConsumerImpl的引用。

[DefaultMQPushConsumerImpl.java]
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
    // 初始化DefaultMQPushConsumerImpl,将defaultMQPushConsumer的实际引用传入
    this.defaultMQPushConsumer = defaultMQPushConsumer;
    // 传入rpcHook并指向本类的引用
    this.rpcHook = rpcHook;
}

注册消费监听MessageListener

我们接着看一下注册消费监听器的流程。

消费监听接口MessageListener有两个具体的实现,分别为

MessageListenerConcurrently     -- 并行消费监听
MessageListenerOrderly          -- 顺序消费监听

本文以MessageListenerConcurrently为主要讲解的对象。

查看MessageListenerConcurrently的注册过程。

@Override
public void registerMessageListener(
            MessageListenerConcurrently messageListener) {
    // 将实现指向本类引用
    this.messageListener = messageListener;
    // 进行真实注册
    this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}

接着看defaultMQPushConsumerImpl.registerMessageListener

DefaultMQPushConsumerImpl.java
public void registerMessageListener(MessageListener messageListener) {
    this.messageListenerInner = messageListener;
}

可以看到DefaultMQPushConsumerImpl将真实的messageListener实现指向它本类的messageListener引用。

订阅topic

接着看一下订阅topic的主流程。

topic订阅主要通过方法subscribe实现,首先看一下DefaultMQPushConsumer的subscribe实现

@Override
public void subscribe(String topic, String subExpression) 
                                    throws MQClientException {
    this.defaultMQPushConsumerImpl
        .subscribe(withNamespace(topic), subExpression);
}

可以看到是调用了DefaultMQPushConsumerImpl的subscribe方法。

public void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        // 构建主题的订阅数据,默认为集群消费
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
            topic, subExpression);
        // 将topic的订阅数据进行保存
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            // 如果MQClientInstance不为空,则向所有的broker发送心跳包,加锁
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

看一下buildSubscriptionData代码逻辑

[FilterAPI.java]
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
    String subString) throws Exception {
    // 构造一个SubscriptionData实体,设置topic、表达式(tag)
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);

    // 如果tag为空或者为"*",统一设置为"*",即订阅所有消息
    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {
        // tag不为空,则先按照‘|’进行分割
        String[] tags = subString.split("\\|\\|");
        if (tags.length > 0) {
            // 遍历tag表达式数组
            for (String tag : tags) {
                if (tag.length() > 0) {
                    String trimString = tag.trim();
                    if (trimString.length() > 0) {
                        // 将每个tag的值设置到tagSet中
                        subscriptionData.getTagsSet().add(trimString);
                        subscriptionData.getCodeSet().add(trimString.hashCode());
                    }
                }
            }
        } else {
            // tag解析异常
            throw new Exception("subString split error");
        }
    }
    return subscriptionData;
}

看一下sendHeartbeatToAllBrokerWithLock代码逻辑

[MQClientInstance.java]
public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            // 发送心跳包
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}

可以看到,同步发送心跳包给所有的broker,而该过程是通过RemotingClient统一实现的,通过调用RemotingClient.invokeSync实现心跳包的发送,底层是通过Netty实现的。具体细节本文不进行展开。

启动消费客户端

上述初始化流程执行完毕之后,通过start()方法启动消费客户端。

@Override
public void start() throws MQClientException {
    // 设置消费者组
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    // 启动消费客户端
    this.defaultMQPushConsumerImpl.start();
    // trace处理逻辑
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

关于trace的处理逻辑,本文不再展开,感兴趣的同学可以移步 跟我学RocketMQ之消息轨迹实战与源码分析

接着看defaultMQPushConsumerImpl.start()方法逻辑

[DefaultMQPushConsumerImpl.java]
public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={},
             isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

首次启动后,执行配置检查,该方法为前置校验方法,主要进行消费属性校验。

this.checkConfig();

将订阅关系配置信息进行复制

this.copySubscription();

如果当前为集群消费模式,修改实例名为pid

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}

创建一个新的MQClientInstance实例,如果已经存在直接使用该存在的MQClientInstance

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

为消费者负载均衡实现rebalanceImpl设置属性

// 设置消费者组
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 设置消费模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 设置队列分配策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 设置当前的MQClientInstance实例
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);


this.pullAPIWrapper = new PullAPIWrapper(
    mQClientFactory,
    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 注册消息过滤钩子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

处理offset存储方式

// offsetStore不为空则使用当前的offsetStore方式
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    // 否则根据消费方式选择具体的offsetStore方式存储offset
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        // 如果是广播方式,则使用本地存储方式
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        // 如果是集群方式,则使用远端broker存储方式存储offset
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
 // 加载当前的offset
this.offsetStore.load();

根据MessageListener的具体实现方式选取具体的消息拉取线程实现。

// 如果是MessageListenerOrderly顺序消费接口实现
// 消息消费服务选择:ConsumeMessageOrderlyService(顺序消息消费服务)
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} 
// 如果是MessageListenerConcurrently并行消息消费接口实现
// 消息消费服务选择:ConsumeMessageConcurrentlyService(并行消息消费服务)
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

选择并初始化完成具体的消息消费服务之后,启动消息消费服务。consumeMessageService主要负责对消息进行消费,它的内部维护了一个线程池。

// 启动消息消费服务
this.consumeMessageService.start();

接着向MQClientInstance注册消费者,并启动MQClientInstance。这里再次强调

一个JVM中所有消费者、生产者持有同一个MQClientInstance,且MQClientInstance只会启动一次

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown();
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}

mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;

如果MQClientInstance已经启动,或者已经关闭,或者启动失败,重复调用start会报错。这里也能直观的反映出:MQClientInstance的启动只有一次

    case RUNNING:
    case START_FAILED:
    case SHUTDOWN_ALREADY:
        throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    default:
        break;
}

启动完成执行后续收尾工作

    // 订阅关系改变,更新Nameserver的订阅关系表
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    // 检查客户端状态
    this.mQClientFactory.checkClientInBroker();
    // 发送心跳包
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 唤醒执行消费者负载均衡
    this.mQClientFactory.rebalanceImmediately();
}

copySubscription(),消息重试topic处理逻辑

消费者启动流程较为重要,我们接着对其中的重点方法展开讲解。这部分内容可以暂时跳过,不影响对主流程的把控。

我们研究一下copySubscription方法的实现细节。

[DefaultMQPushConsumerImpl.java]
private void copySubscription() throws MQClientException {
    try {

        // 首先获取订阅信息
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                    topic, subString);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }

        // 为defaultMQPushConsumer设置具体的MessageListener实现
        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }

根据消费类型选择是否进行重试topic订阅

        switch (this.defaultMQPushConsumer.getMessageModel()) {

            // 如果是广播消费模式,则不进行任何处理,即无重试
            case BROADCASTING:
                break;

            // 如果是集群消费模式,订阅重试主题消息
            case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                    retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

如果是集群消费模式,会订阅重试主题消息

获取重试topic,规则为 RETRY_GROUP_TOPIC_PREFIX + consumerGroup,即:“%RETRY%”+消费组名

为重试topic设置订阅关系,订阅所有的消息;

消费者启动的时候会自动订阅该重试主题,并参与该topic的消息队列负载过程。

小结

到此,我们就DefaultMQPushConsumer的初始化、启动、校验以及topic订阅、重试等代码实现
细节进行了较为详细的讲解。

下一章节,我将带领读者对消息消费线程 consumeMessageService 的实现进行分析,我们下篇文章见。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. DefaultMQPushConsumer使用样例
  2. 2. 初始化DefaultMQPushConsumer
  3. 3. 注册消费监听MessageListener
  4. 4. 订阅topic
  5. 5. 启动消费客户端
    1. 5.1. copySubscription(),消息重试topic处理逻辑
  6. 6. 小结
Fork me on GitHub