文章目录
  1. 1. DefaultProducer.send
  2. 2. DefaultProducer.batch
  3. 3. MessageBatch.generateFromList
  4. 4. DefaultProducer.batch
  5. 5. 其他

上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送。本文中,我们就一起来集中分析一下批量消息的发送是怎样的逻辑。

DefaultProducer.send

RocketMQ提供了批量发送消息的API,同样在DefaultProducer.java中

@Override
public SendResult send(
    Collection<Message> msgs) throws MQClientException, RemotingException, 
        MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}

它的参数为Message集合,也就是一批消息。它的另外一个重载方法提供了发送超时时间参数

@Override
public SendResult send(Collection<Message> msgs,
    long timeout) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs), timeout);
}

可以看到是将消息通过batch()方法打包为单条消息,我们看一下batch方法的逻辑

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

    // 声明批量消息体
    MessageBatch msgBatch;
    try {

        // 从Message的list生成批量消息体MessageBatch
        msgBatch = MessageBatch.generateFromList(msgs);
        for (Message message : msgBatch) {
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        // 设置消息体,此时的消息体已经是处理过后的批量消息体
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    // 设置topic
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}

从代码可以看到,核心思想是将一批消息(Collection msgs)打包为MessageBatch对象,我们看下MessageBatch的声明

public class MessageBatch extends Message implements Iterable<Message> {

    private final List<Message> messages;

    private MessageBatch(List<Message> messages) {
        this.messages = messages;
    }

可以看到MessageBatch继承自Message,持有List引用。

我们接着看一下generateFromList方法

MessageBatch.generateFromList

public static MessageBatch generateFromList(Collection<Message> messages) {
    assert messages != null;
    assert messages.size() > 0;

    // 首先实例化一个Message的list
    List<Message> messageList = new ArrayList<Message>(messages.size());
    Message first = null;

    // 对messages集合进行遍历
    for (Message message : messages) {

        // 判断延时级别,如果大于0抛出异常,原因为:批量消息发送不支持延时
        if (message.getDelayTimeLevel() > 0) {
            throw new UnsupportedOperationException
                ("TimeDelayLevel in not supported for batching");
        }

        // 判断topic是否以 **"%RETRY%"** 开头,如果是,
        // 则抛出异常,原因为:批量发送消息不支持消息重试
        if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            throw new UnsupportedOperationException("Retry Group is not supported for batching");
        }

        // 判断集合中的每个Message的topic与批量发送topic是否一致,
        // 如果不一致则抛出异常,原因为:
        // 批量消息中的每个消息实体的Topic要和批量消息整体的topic保持一致。
        if (first == null) {
            first = message;
        } else {
            if (!first.getTopic().equals(message.getTopic())) {
                throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
            }

            // 判断批量消息的首个Message与其他的每个Message实体的等待消息存储状态是否相同,
            // 如果不同则报错,原因为:批量消息中每个消息的waitStoreMsgOK状态均应该相同。
            if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
            }
        }

        // 校验通过后,将message实体添加到messageList中
        messageList.add(message);
    }

    // 将处理完成的messageList作为构造方法,
    // 初始化MessageBatch实体,并设置topic以及isWaitStoreMsgOK状态。
    MessageBatch messageBatch = new MessageBatch(messageList);

    messageBatch.setTopic(first.getTopic());
    messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
    return messageBatch;
}

总结一下,generateFromList方法对调用方设置的Collection集合进行遍历,经过前置校验之后,转换为MessageBatch对象并返回给DefaultProducer.batch方法中,我们接着看DefaultProducer.batch的逻辑。

到此,通过MessageBatch.generateFromList方法,将发送端传入的一批消息集合转换为了MessageBatch实体。

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

    // 声明批量消息体
    MessageBatch msgBatch;
    try {
        // 从Message的list生成批量消息体MessageBatch
        msgBatch = MessageBatch.generateFromList(msgs);
        for (Message message : msgBatch) {
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        // 设置消息体,此时的消息体已经是处理过后的批量消息体
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    // 设置topic
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}

注意下面这行代码:

// 设置消息体,此时的消息体已经是处理过后的批量消息体
msgBatch.setBody(msgBatch.encode());

这里对MessageBatch进行消息编码处理,通过调用MessageBatch的encode方法实现,代码逻辑如下:

public byte[] encode() {
    return MessageDecoder.encodeMessages(messages);
}

可以看到是通过静态方法 encodeMessages(List messages) 实现的。

我们看一下encodeMessages方法的逻辑:

public static byte[] encodeMessages(List<Message> messages) {
    //TO DO refactor, accumulate in one buffer, avoid copies
    List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
    int allSize = 0;
    for (Message message : messages) {

        // 遍历messages集合,分别对每个Message实体进行编码操作,转换为byte[]
        byte[] tmp = encodeMessage(message);
        // 将转换后的单个Message的byte[]设置到encodedMessages中
        encodedMessages.add(tmp);
        // 批量消息的二进制数据长度随实际消息体递增
        allSize += tmp.length;
    }
    byte[] allBytes = new byte[allSize];
    int pos = 0;
    for (byte[] bytes : encodedMessages) {
        // 遍历encodedMessages,按序复制每个Message的二进制格式消息体
        System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
        pos += bytes.length;
    }
    // 返回批量消息整体的消息体二进制数组
    return allBytes;
}

encodeMessages的逻辑在注释中分析的已经比较清楚了,其实就是遍历messages,并按序拼接每个Message实体的二进制数组格式消息体并返回。

我们可以继续看一下单个Message是如何进行编码的,调用了 MessageDecoder.encodeMessage(message) 方法,逻辑如下:

public static byte[] encodeMessage(Message message) {
    //only need flag, body, properties
    byte[] body = message.getBody();
    int bodyLen = body.length;
    String properties = messageProperties2String(message.getProperties());
    byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
    //note properties length must not more than Short.MAX
    short propertiesLength = (short) propertiesBytes.length;
    int sysFlag = message.getFlag();
    int storeSize = 4 // 1 TOTALSIZE
        + 4 // 2 MAGICCOD
        + 4 // 3 BODYCRC
        + 4 // 4 FLAG
        + 4 + bodyLen // 4 BODY
        + 2 + propertiesLength;
    ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
    // 1 TOTALSIZE
    byteBuffer.putInt(storeSize);

    // 2 MAGICCODE
    byteBuffer.putInt(0);

    // 3 BODYCRC
    byteBuffer.putInt(0);

    // 4 FLAG
    int flag = message.getFlag();
    byteBuffer.putInt(flag);

    // 5 BODY
    byteBuffer.putInt(bodyLen);
    byteBuffer.put(body);

    // 6 properties
    byteBuffer.putShort(propertiesLength);
    byteBuffer.put(propertiesBytes);

    return byteBuffer.array();
}

这里其实就是将消息按照RocektMQ的消息协议进行编码,格式为:

消息总长度          ---  4字节
魔数                --- 4字节
bodyCRC校验码       --- 4字节
flag标识            --- 4字节
body长度            --- 4字节
消息体              --- 消息体实际长度N字节
属性长度            --- 2字节
扩展属性            --- N字节

通过encodeMessage方法处理之后,消息便会被编码为固定格式,最终会被Broker端进行处理并持久化。

其他

到此便是批量消息发送的源码分析,实际上RocketMQ在处理批量消息的时候是将其解析为单个消息再发送的,这样就在底层统一了单条消息、批量消息发送的逻辑,让整个框架的设计更加健壮,也便于我们进行理解学习。

后续的发送流程这里就不再重复展开了,感兴趣的同学可以移步我们的上一篇文章查看

跟我学RocketMQ之消息发送源码解析

批量消息的源码分析就暂时告一段落,更多的源码分析随后奉上,感谢您的阅读。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. DefaultProducer.send
  2. 2. DefaultProducer.batch
  3. 3. MessageBatch.generateFromList
  4. 4. DefaultProducer.batch
  5. 5. 其他
Fork me on GitHub