跟我学RocketMQ之批量消息发送源码解析
上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送。本文中,我们就一起来集中分析一下批量消息的发送是怎样的逻辑。
DefaultProducer.send
RocketMQ提供了批量发送消息的API,同样在DefaultProducer.java中
@Override
public SendResult send(
Collection<Message> msgs) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
它的参数为Message集合,也就是一批消息。它的另外一个重载方法提供了发送超时时间参数
@Override
public SendResult send(Collection<Message> msgs,
long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), timeout);
}
可以看到是将消息通过batch()方法打包为单条消息,我们看一下batch方法的逻辑