文章目录
  1. 1. 生产者初始化
  2. 2. 核心发送流程之DefaultMQProducerImpl.sendDefaultImpl方法
  3. 3. 核心发送流程之DefaultMQProducerImpl.sendKernelImpl方法
    1. 3.1. MQClientAPIImpl.sendMessage
      1. 3.1.1. 异步发送方法 sendMessageAsync
      2. 3.1.2. 同步发送方法 sendMessageSync
        1. 3.1.2.1. processSendResponse解析发送结果响应
    2. 3.2. 回到DefaultMQProducerImpl.sendKernelImpl
  4. 4. 小结

本文我将带领读者朋友对RocketMQ生产者如何发送消息这一流程进行源码层面的解析。内容偏干,请自备白开水。

生产者初始化

进行消息发送的前提是先对生产者进行初始化,一段较为常规的生产者初始化示例代码如下

@Value("${rocketmq.nameServer}")
String nameSrvAddr;

@PostConstruct
public void init() {

    DefaultMQProducer defaultMQProducer =
            new DefaultMQProducer("PRODUCER_GROUP", true);
    defaultMQProducer.setNamesrvAddr(nameSrvAddr);
    // 发送失败重试次数
    defaultMQProducer.setRetryTimesWhenSendFailed(3);
    try {
        defaultMQProducer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("Producer加载异常!", e);
    }
}

我们对初始化流程稍作分析。

首先初始化一个DefaultMQProducer实例,调用构造方法

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
    this(null, producerGroup, null, enableMsgTrace, null);
}

第二个参数为是否开启消息轨迹支持,关于消息轨迹的源码解析可以移步 《跟我学RocketMQ之消息轨迹实战与源码分析》

通过setNamesrvAddr(String namesrvAddr)设置nameserver地址;通过setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed)设置重发次数,默认为2。

[DefaultMQProducer.java]
private int retryTimesWhenSendFailed = 2;

接着调用start()方法启动defaultMQProducer

[DefaultMQProducer.java]
@Override
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 启动producer实例
    this.defaultMQProducerImpl.start();
    ...省略traceDispatcher相关逻辑...
}

可以看到是调用的defaultMQProducerImpl的start()

[DefaultMQProducerImpl.java]

public void start() throws MQClientException {
    this.start(true);
}

实际调用了start的重载方法,startFactory==true

// MQClientInstance引用
private MQClientInstance mQClientFactory;

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 如果当前服务状态为CREATE_JUST【刚创建】
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

注意这句代码

// 判断当前生产者组是否符合要求
// 改变生产者的实例id为进程id
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
    this.defaultMQProducer.changeInstanceNameToPID();
}

这里检查生产者组是否符合要求,符合则改变生产者的instanceName为进程id,具体逻辑为

private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}

实例名为配置文件配置得到的,默认为DEFAULT,我们接着回到start的重载方法 public void start(final boolean startFactory) throws MQClientException

// 初始化一个MQ客户端工厂,同一个clientId只有一个MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

这里初始化了MQ客户端工厂,对于同一个clientId只有一个MQClientInstance。看一下getAndCreateMQClientInstance方法。

[MQClientManager.java]
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    // 构建MQClientId
    String clientId = clientConfig.buildMQClientId();

    // 从clientId与MQClientInstance映射表factoryTable中获取当前clientId对应的MQClientInstance
    MQClientInstance instance = this.factoryTable.get(clientId);

    // 如果MQClientInstance不存在则创建一个新的并放入映射表factoryTable中
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }
    return instance;
}

我们接着看下clientId是如何生成的

/**
 * 构建MQ客户端id
 * clientId=客户端ip+@+实例名+unitName(可选)
 * @return
 */
public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());

    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }
    return sb.toString();
}

可以看到,clientId的构造规则为:

clientId=客户端ip+@+实例名+unitName(可选),对于同一个JVM中的不同消费者和不同生产者在启动时候获取到的MQClientInstance是同一个。MQClientInstance是封装了网络调用相关的逻辑。

我们接着回到start方法中

            // 注册生产者,将当前生产者加入到MQClientInstance中
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                // 注册失败,状态==仅创建
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            // 注册成功则将当前生产者组对应的topic与发布关系放入topicPublishInfoTable注册表
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            // 启动MQClientFactory,如果已经启动则不会再启动一次
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

这里向MQClientInstance进行注册,将当前的生产者加入到MQClientInstance管理中。

通过mQClientFactory.start();启动MQClientInstance,如果已经启动则不会重复启动,具体的代码逻辑如下:

[MQClientInstance.java]
public void start() throws MQClientException {

    // 同步当前实例
    synchronized (this) {
        switch (this.serviceState) {
            // MQClientInstance状态为[刚创建],进行启动操作
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service  启动消息拉取线程
                this.pullMessageService.start();
                // Start rebalance service 启动消息重负载线程
                this.rebalanceService.start();
                // Start push service 启动生产者
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;

            // 如果当前服务的状态为RUNNING运行中则不重复启动
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

由于生产者和消息者实例均使用同一个MQClientInstance,因此会在MQClientInstance中同时对生产者线程、消费拉取线程、rebalance线程进行启动操作。

到此,消息发送的必要条件:生产者启动过程就结束了,我们接着研究一下消息发送的流程。

核心发送流程之DefaultMQProducerImpl.sendDefaultImpl方法

消息发送的关键API为 send 方法,常见的一个API声明为

[DefaultMQProducer.java]
public SendResult send(Message msg,long timeout) throws MQClientException, 
RemotingException, MQBrokerException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    return this.defaultMQProducerImpl.send(msg, timeout);
}

它调用的是 DefaultMQProducerImpl 中的send

[DefaultMQProducerImpl.java]
public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

调用了 sendDefaultImpl 方法,方法声明及参数解释如下

[DefaultMQProducerImpl.java]
private SendResult sendDefaultImpl(
    Message msg,                                // 消息发送实体
    final CommunicationMode communicationMode,  // 发送类别,枚举类型
    final SendCallback sendCallback,            // 如果是异步发送方式,则需要实现SendCallback回调
    final long timeout                          // 超时时间
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

CommunicationMode 表示发送类别

public enum CommunicationMode {
    SYNC,                   // 同步发送
    ASYNC,                  // 异步发送
    ONEWAY,                 // 直接发送,不关心发送结果
}

我们详细分析一下sendDefaultImpl方法逻辑:

[DefaultMQProducerImpl.java]
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

从当前的MQClientInstance中获取broker地址,如果broker地址为空,则向NameServer查找该Topic路由信息,我们看一下findBrokerAddressInPublish方法

[DefaultMQProducerImpl.java]
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 从缓存的topic路由表中获取topic路由
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

    // 不存在则向NameServer发起查找
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    // 路由表中存在路由信息
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        // 返回路由信息
        return topicPublishInfo;
    } else {
        // 从NameServer中获取最新的路由信息,更新路由表
        // 返回当前路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

获取到路由表信息后,开始进行发送前的校验等逻辑,预先定义一些变量供后续使用

if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;

获取发送总次数,发送次数timesTotal是根据发送类别决定的。

如果是同步发送[CommunicationMode.SYNC],则发送总次数== 1+重试次数(retryTimesWhenSendFailed);

如果是异步发送[CommunicationMode.ASYNC],则发送总次数== 1;

// 获取发送总次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 
                1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 
                1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();

选择根据topic路由表及broker名称,获取一个messageQueue,本次发送的队列就是选取的队列,关于选取队列的方法selectOneMessageQueue,我们接下来会展开看下细节

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
    mq = mqSelected;
    brokersSent[times] = mq.getBrokerName();
    try {
        beginTimestampPrev = System.currentTimeMillis();
        if (times > 0) {
            //Reset topic with namespace during resend.
            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
        }
        // 计算一下发送消耗的时间
        long costTime = beginTimestampPrev - beginTimestampFirst;

我们看一下selectOneMessageQueue方法是如何进行队列的选择的:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

    // 如果启用了broker故障延迟机制
    if (this.sendLatencyFaultEnable) {
        try {
            // 本次需要发送的队列的index就是SendWhichQueue自增得到的
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                // index与当前路由表中的对列总个数取模
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                // 获取到当前对应的待发送队列
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            // 至少选择一个broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            // 获取broker中的可写队列数
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            // 如果可写队列数>0,则选取一个队列
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                // 可写队列数 <= 0 移除该broker
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

这段代码的核心就是进行队列的选取,选取的过程中伴随着故障检测,对于故障broker能够做到尽可能规避。

我们回到消息发送逻辑sendDefaultImpl中,在每一次发送过程中,计算本次发送的实际消耗时间,并与发送端设置的发送超时时间做比较。

如果设置的超时时间timeout小于实际消耗的时间,说明发送超时,代码如下

if (timeout < costTime) {
    callTimeout = true;
    // 发送超时结束本次循环
    break;
}

进行真正的消息发送流程,调用sendKernelImpl方法,代码如下。关于sendKernelImpl逻辑在后文会展开论述。

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

根据发送类型进行逻辑执行

switch (communicationMode) {
    case ASYNC:
        return null;
    case ONEWAY:
        return null;
    case SYNC:
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                continue;
            }
        }
        return sendResult;
    default:
        break;
}

这段代码较好理解,如果是异步方式,直接返回sendResult为null,真实的发送结果是在回调SendCallback中获取的;如果是ONEWAY方式,则根本不关心发送结果;

如果是同步方式,判断发送结果是否为 SendStatus.SEND_OK,执行逻辑 isRetryAnotherBrokerWhenNotStoreOK,这里是消息发送失败的重试逻辑:

如果消息未持久化重试下一个broker成功,则跳出本次循环,继续下次重试。

此处省略异常处理逻辑,感兴趣的可以自行查看源码。

if (sendResult != null) {
    return sendResult;
}

如果获取到发送结果sendResult不为空,则返回该发送结果供业务侧进行处理。

核心发送流程之DefaultMQProducerImpl.sendKernelImpl方法

我们重点来研究一下sendKernelImpl方法,它是消息发送的出口,也是真正发起消息发送调用的逻辑。

方法声明如下:

private SendResult sendKernelImpl(
                    // 待发送的消息
                    final Message msg,
                    // 消息待发送的队列,该队列是通过selectOneMessageQueue选择的
                    final MessageQueue mq,
                    // 消息发送模式
                    final CommunicationMode communicationMode,
                    // 如果是异步发送,则需要实现SendCallback
                    final SendCallback sendCallback,
                    // topic对应的路由信息表
                    final TopicPublishInfo topicPublishInfo,
                    // 发送超时时间,由客户端指定
                    final long timeout) throws MQClientException, RemotingException, MQBrokerException,
                         InterruptedException {

获取发送真实开始时间以及brokerAddr,这里的逻辑与sendDefaultImpl的完全一致不再赘述,之所以再调用一次的原因,应当是为了准确性,时间就不说了;可用的brokerAddr列表是的动态拉取的,应当获取当前最新的brokerAddr。

long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

根据broker地址计算得到VIP通道地址,计算方法为ip+(默认端口号-2)

SendMessageContext context = null;
if (brokerAddr != null) {
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);     
    // 获取消息体byte数组
    byte[] prevBody = msg.getBody();

接着对消息进行前置处理,为消息分配全局唯一Id,对于批量消息,它的全局唯一id是单独生成的,后面细说。

if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

判断是否为事务消息

// 获取消息属性,key=PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
// 判断是否为事务消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

// 如果是事务消息,通过sysFlag与TRANSACTION_PREPARED_TYPE按位或,计算最新的sysFlag
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

如果发送时注册了发送钩子方法,则先执行该发送钩子逻辑进行前置增强,这种方式类似于切面的逻辑。

if (this.hasSendMessageHook()) {

    // 设置消息发送上下文
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());

    // 如果是事务消息,则上下文中标记消息类型为事务半消息Trans_Msg_Half
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

关于事务消息的发送后续会单独发文进行分析,此处不展开

    // 如果是延时消息,则标记消息类型为延时消息Delay_Msg
    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }

    // 执行发送前置钩子方法
    this.executeSendMessageHookBefore(context);
}

执行完发送前置的钩子方法之后,开始正式执行发送逻辑,首先对消息发送请求头进行实例化。

// 声明并初始化消息发送请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

// 设置请求头参数:发送者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 设置请求头参数:topic
requestHeader.setTopic(msg.getTopic());
// 设置默认topic,其实就是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC=TBW102,如果开启了自动创建topic,则会创建该topic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 默认topic对应的消息队列数量
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 当前要发送的消息对应的队列id
requestHeader.setQueueId(mq.getQueueId());
// 系统标识,前面逻辑计算得到
requestHeader.setSysFlag(sysFlag);
// 消息诞生时间,系统当前时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息flag
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
// 由于是发送消息,所以设置为0
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否为批量消息
requestHeader.setBatch(msg instanceof MessageBatch);

如果当前消息的topic以MixAll.RETRY_GROUP_TOPIC_PREFIX开头,

RETRY_GROUP_TOPIC_PREFIX = “%RETRY%”;

表明当前topic实际上是topic对应的重试topic,则执行消息重试发送相关的逻辑

// 如果当前消息topic为重试topic
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    // 获取重试次数
    // 重试次数不为null则清除重试次数
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }

    // 获取最大重试次数
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}

根据真实的发送类型选择对应的消息发送方式:

首先来看一下发送方式为:ASYNC(异步发送方式)的发送逻辑

switch (communicationMode) {
   case ASYNC:
       Message tmpMessage = msg;
       boolean messageCloned = false;
       // 如果消息body是压缩的,则使用prevBody,prevBody就是真实的msgBody对应的byte[]
       if (msgBodyCompressed) {
           //If msg body was compressed, msgbody should be reset using prevBody.
           //Clone new message using commpressed message body and recover origin massage.
           tmpMessage = MessageAccessor.cloneMessage(msg);
           messageCloned = true;
           // 将压缩的消息体恢复为原消息体
           msg.setBody(prevBody);
       }

       if (topicWithNamespace) {
           if (!messageCloned) {
               tmpMessage = MessageAccessor.cloneMessage(msg);
               messageCloned = true;
           }
           msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
       }


       long costTimeAsync = System.currentTimeMillis() - beginStartTime;
       if (timeout < costTimeAsync) {
           throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
       }

调用MQClientInstance的getMQClientAPIImpl.sendMessage方法进行网络通信,并获取发送结果

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
    brokerAddr,
    mq.getBrokerName(),
    tmpMessage,
    requestHeader,
    timeout - costTimeAsync,
    communicationMode,
    sendCallback,
    topicPublishInfo,
    this.mQClientFactory,
    this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
    context,
    this);
break;

MQClientAPIImpl.sendMessage

我们直接看一下MQClientAPIImpl.sendMessage逻辑是如何处理异步消息发送的

public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;

    // 如果消息是sendSmartMsg(org.apache.rocketmq.client.sendSmartMsg==true)
    // 或者是批量消息
    if (sendSmartMsg || msg instanceof MessageBatch) {

        // 更换发送请求头
        SendMessageRequestHeaderV2 requestHeaderV2 = 
        SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
        request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? 
                  RequestCode.SEND_BATCH_MESSAGE : 
                  RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
    } else {
        // 如果消息是非批量发送
        // 设置消息发送命令为RequestCode.SEND_MESSAGE
        request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
    }

    // 设置发送请求body为消息的msgBody
    request.setBody(msg.getBody());

    switch (communicationMode) {
        // 如果是ONEWAY方式,发出去不关心结果
        case ONEWAY:
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;

        // 如果是异步方式,判断是否发送超时  
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // 调用异步消息发送方法
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
            return null;

        // 如果是同步发送,调用同步发送
        方法    
        case SYNC:
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }
    return null;
}

好像还没结束,那么我们就分别看一下异步方式和同步方式对应的发送方法。

异步发送方法 sendMessageAsync

    ...方法声明省略,实在是太长了...
    // 异步方式调用发送逻辑
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, 
    // 发送回调的真实逻辑
    new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();

            // 如果业务发送方没有实现sendCallback,但是有接口调用返回值response
            if (null == sendCallback && response != null) {

                try {

                    // 发送返回值sendResult为processSendResponse处理得到的
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);

                    // 刷新消息发送上下文,执行发送后钩子方法
                    if (context != null && sendResult != null) {
                        context.setSendResult(sendResult);
                        context.getProducer().executeSendMessageHookAfter(context);
                    }
                } catch (Throwable e) {
                }

                // 更新故障broker
                producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                return;
            }

            // 对于实现了sendCallback的发送端
            if (response != null) {
                try {

                    // 获取sendResult
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                    assert sendResult != null;

                    // 执行发送后钩子方法
                    if (context != null) {
                        context.setSendResult(sendResult);
                        context.getProducer().executeSendMessageHookAfter(context);
                    }

                    // 回调发送成功回调方法onSuccess
                    try {
                        sendCallback.onSuccess(sendResult);
                    } catch (Throwable e) {
                    }

                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                } catch (Exception e) {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);

                    // 对于处理异常的情况,传入sendCallback,回调其发送
                    // 失败回调方法onException(e)
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, e, context, false, producer);
                }
                ...省略其他异常流程,大同小异...
        }
    });
}

为了方便大家理解,这里对invokeAsync异步处理逻辑做一个小结:

  1. 首先判断接口参数中是否存在sendCallback;
    1. 如果有且非空,则在取得发送结果sendResult之后回调sendCallback的onSuccess方法,以便发送方对发送结果做进一步的处理
    2. 如果sendCallback不存在,则直接解析发送结果,按照同步发送方式进行处理
    3. 最后,如果存在发送上下文context,则执行发送后钩子方法
  2. 对于存在sendCallback,但发送异常的情况,回调sendCallback的onException方法进行异常处理。
  3. 对于异常的broker节点进行更新操作

同步发送方法 sendMessageSync

private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    // 0.执行同步发送逻辑
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    // 1.校验返回参 断言
    assert response != null;
    // 2.处理发送结果
    return this.processSendResponse(brokerName, msg, response);
}

我们接着看一下processSendResponse方法的逻辑

processSendResponse解析发送结果响应
switch (response.getCode()) {
    case ResponseCode.FLUSH_DISK_TIMEOUT:
    case ResponseCode.FLUSH_SLAVE_TIMEOUT:
    case ResponseCode.SLAVE_NOT_AVAILABLE: {
    }

如果发送消息返回ResponseCode.SUCCESS,则设置SendStatus为SEND_OK

case ResponseCode.SUCCESS: {
    SendStatus sendStatus = SendStatus.SEND_OK;
    switch (response.getCode()) {
        case ResponseCode.FLUSH_DISK_TIMEOUT:
            sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
            break;
        case ResponseCode.FLUSH_SLAVE_TIMEOUT:
            sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
            break;
        case ResponseCode.SLAVE_NOT_AVAILABLE:
            sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
            break;
        case ResponseCode.SUCCESS:
            sendStatus = SendStatus.SEND_OK;
            break;
        default:
            assert false;
            break;
    }

上述逻辑为根据具体的发送响应设置对应的SendStatus

SendMessageResponseHeader responseHeader =
    (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);

//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
    topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
}

MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());

String uniqMsgId = MessageClientIDSetter.getUniqID(msg);

// 如果是批量消息,逗号拼接uniqMsgId
if (msg instanceof MessageBatch) {
    StringBuilder sb = new StringBuilder();
    for (Message message : (MessageBatch) msg) {
        sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
    }
    uniqMsgId = sb.toString();
}

组装SendResult,填充属性并返回SendResult

        SendResult sendResult = new SendResult(sendStatus,
            uniqMsgId,
            responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
        sendResult.setTransactionId(responseHeader.getTransactionId());
        String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
        String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
        if (regionId == null || regionId.isEmpty()) {
            regionId = MixAll.DEFAULT_TRACE_REGION_ID;
        }
        if (traceOn != null && traceOn.equals("false")) {
            sendResult.setTraceOn(false);
        } else {
            sendResult.setTraceOn(true);
        }
        sendResult.setRegionId(regionId);
        return sendResult;
    }
    default:
        break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());

回到DefaultMQProducerImpl.sendKernelImpl

讲完了异步发送方式及下方的调用逻辑,我们回到sendKernelImpl中,继续看其他的发送方式。

    case ONEWAY:
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}

对于ONEWAY、同步方式,处理逻辑相同,都是直接调用 MQClientAPIImpl.sendMessage 这个方法的逻辑,该方法我们已经在上文中解释过,就不再赘述了,读者可以通过 MQClientAPIImpl.sendMessage 三级标题自行去查看。

对于同步方式执行sendMessageSync方法,该方法在上文中已经讲解过;对于oneway方式执行invokeOneway方法。

invokeOneWay的真实逻辑在NettyRemotingClient.java中实现,NettyRemotingClient封装了底层的网络交互,关于它的其他内容后续会在网络通信部分的解析文章中展开。

@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
    RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    // 根据broker地址创建NIO的通信channel    
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 执行发送前置钩子方法
            doBeforeRpcHooks(addr, request);
            // 执行真实的网络调用,不关心发送结果
            this.invokeOnewayImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

OneWay发送方式执行完网络通信之后不关注返回结果,因此适用于对返回值不敏感的流程中,比如日志上报、埋点上报等业务中。

我们继续回到DefaultMQProducerImpl.sendKernelImpl方法中.

// 如果注册了发送后的钩子函数
// 执行该钩子函数
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}

return sendResult;

这段代码发生在发送逻辑之后,不论是何种发送类型,如果包含了发送消息的钩子方法,则将发送结果sendResult设置到发送消息上下文context中(对于sendOneWay方式,返回的sendResult为null)。然后执行发送消息后的钩子方法sendMessageAfter,逻辑如下:

public void executeSendMessageHookAfter(final SendMessageContext context) {
    if (!this.sendMessageHookList.isEmpty()) {
        for (SendMessageHook hook : this.sendMessageHookList) {
            try {
                hook.sendMessageAfter(context);
            } catch (Throwable e) {
                log.warn("failed to executeSendMessageHookAfter", e);
            }
        }
    }
}

钩子方法的注册是通过 DefaultMQProducerImpl.registerSendMessageHook 方法注册的,可以注册多个,为一个list。因此上述executeSendMessageHookAfter方法中为对该list的遍历,每轮遍历中执行该SendMessageHook的sendMessageAfter方法。

小结

本文是源码解析的第二篇文章,也是属于偏硬核的一类文章,如果你能坚持读到这里,请给自己一个鼓励,你已经强过很多人了。

笔者对RocketMQ的研究程度尚浅,因此难免出现纰漏,笔者会再接再励。关于批量消息发送、事务消息发送等逻辑的分析,在接下来的文章将会陆续进行展开,我们不见不散。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 生产者初始化
  2. 2. 核心发送流程之DefaultMQProducerImpl.sendDefaultImpl方法
  3. 3. 核心发送流程之DefaultMQProducerImpl.sendKernelImpl方法
    1. 3.1. MQClientAPIImpl.sendMessage
      1. 3.1.1. 异步发送方法 sendMessageAsync
      2. 3.1.2. 同步发送方法 sendMessageSync
        1. 3.1.2.1. processSendResponse解析发送结果响应
    2. 3.2. 回到DefaultMQProducerImpl.sendKernelImpl
  4. 4. 小结
Fork me on GitHub