跟我学RocketMQ之拉模式消费的两种方式
今天我们聊聊RocketMQ基于拉模式的两种消费方式。
对于消费而言,RocketMQ提供了推拉两种方式,我们常用的是基于长轮询的DefaultPushConsumer,它具有实时性好,易开发等特点。
但同时由于是长轮询,因此在大量消息消费的场景下,可能导致broker端CPU负载较大等情况,因此我们会在这种情况下选择使用拉模式的
PullConsumer或者MQPullConsumerScheduleService+PullTaskCallback这两种方式进行更为灵活的消费。
PullConsumer消费示例代码
首先我们看下基于PullConsumer方式如何进行消费。这里引用官方的样例代码进行说明:
public class PullConsumer {
我们定义了一个Map,key为指定的队列,value为这个队列拉取数据的最后位置,保存每个对列的拉取进度(offset),这里只是用作示例。实际开发中,这里需要基于持久化到Redis或者MySQL等外部存储设施中。
//Map<key, value> key为指定的队列,value为这个队列拉取数据的最后位置
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
首先需要定义消费者组,实例化一个DefaultMQPullConsumer消费者对象,标记消费者组。
为消费者设置NameServer地址,保证消费者客户端能够从NameServer获取到broker地址,从而执行消息消费流程。
String group_name = "test_pull_consumer_name";
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.start();
System.err.println("consumer start");
通过consumer.fetchSubscribeMessageQueues(TOPIC)方法获取指定TOPIC下的所有队列,默认有4个。
// 从TopicTest这个主题去获取所有的队列(默认会有4个队列)
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
对获取到MessageQueue集合进行遍历,拉取数据并执行具体的消费过程。
// 遍历每一个队列,进行拉取数据
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
通过while(true) 不间断地从队列中拉取数据。默认情况下,每次拉取32条,这里需要显式地传入拉取开始offset,通过getMessageQueueOffset(mq)方法获取,从我们持久化的设施中得到对应MessageQueue的拉取进度(可以认为是消费进度)。
拉取结束后,在持久化设施(这里是一个Map)保存下次拉取的开始offset,也就是本次拉取结束的下一个offset。(通过pullResult.getNextBeginOffset()获取)
while (true) {
try {
// 从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
System.out.println(pullResult.getPullStatus());
System.out.println();
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
从pullResult拉取结果中获取拉取状态,如果是FOUND则表明消息拉取成功;获取消息列表,并循环进行消费。其余均认为未拉取到消息,不做处理。
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for(MessageExt msg : list){
System.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
System.out.println("没有新的数据啦...");
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
拉取结束之后,手动显式调用该方法,刷新对应队列MessageQueue的拉取进度;
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
获取对应MessageQueue的消息消费进度Offset
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
小结PullConsumer消费方式
从上述代码样例中可以看出,PullConsumer方式需要我们显式地存储消费进度,并且在消费过程中要根据情况进行消费进度的更新与存储。
如果开发者稍有不慎,忘记保存offset,则每次都会从第一条进行拉取,这样很容易造成消息重复。如果是生产环境,则后果不忍想象。
另外,我们还需要通过额外的存储手段对offset进行保存,并且尽量保证该设施的稳定可靠,否则还是会引起重复消费的问题。
基于此,我建议使用MQPullConsumerScheduleService+PullTaskCallback这种消费方式,那它具体如何使用呢?
MQPullConsumerScheduleService+PullTaskCallback消费方式
基于上述分析的PullConsumer使用一些不便之处,我这里建议使用MQPullConsumerScheduleService+PullTaskCallback方式进行消费。我们还是按照习惯方式,直接上代码。
step1: 声明并实例化一个MQPullConsumerScheduleService对象,通过构造方法传递消费者组;
String group_name = "test_pull_consumer_name";
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);
为消费者设置NameServer地址,以便能够获取broker地址,开启消费过程。
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
设置消费方式为集群模式;
scheduleService.setMessageModel(MessageModel.CLUSTERING);
step2: 调用registerPullTaskCallback(topic, pullTaskCallback) ,将开发者实现的PullTaskCallback消息拉取实现类注册给MQPullConsumerScheduleService。并绑定到指定的topic下;
scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {
step3: 开发者需要实现PullTaskCallback的doPullTask消息拉取回调方法,这里使用匿名内部类的方式。如果是Spring项目,我们可以定义一个Bean实现PullTaskCallback接口,并将该Bean的引用设置到一个实例化好的MQPullConsumerScheduleService对象中。
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
通过 PullTaskContext上下文获取到消息拉取实例对象MQPullConsumer;
MQPullConsumer consumer = context.getPullConsumer();
System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------");
try {
获取当前的消费进度,即:从哪儿开始消费,如果offset小于0则指定从0开始。
// 获取从哪里拉取
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
从对应的offset拉取指定数量的消息,默认32条,返回结果为PullResult。
通过pullResult.getPullStatus()判断拉取结果,如果为FOUND,则开始消费流程;其他状态不做处理。
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for(MessageExt msg : list){
//消费数据...
ystem.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
step4: 重点来了,这里通过调用updateConsumeOffset,更新消费进度,将下次消费开始时的offset更新到broker。并不需要客户端本地保存消费进度。
设置下次拉取时间,定时进行拉取调度。
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
// 设置再过3000ms后重新拉取
context.setPullNextDelayTimeMillis(3000);
}
...省略catch块...
启动拉取过程。
scheduleService.start();
}
}
小结MQPullConsumerScheduleService消费方式
从代码中我们能够清晰的看出,MQPullConsumerScheduleService的优势:
- MQPullConsumerScheduleService基于定时任务,消费端能够灵活控制拉取频率
- MQPullConsumerScheduleService支持提交消费进度到broker,不需要消费端进行保存
- MQPullConsumerScheduleService本身基于PullConsumer,定制化程度高,使用起来不易出错
可以说,MQPullConsumerScheduleService既保留了PullConsumer的优势,还对其进行了一定程序的增强;通过直接提交消费offset到broker,降低了客户端的开发量,较少了消费重复的风险。
因此笔者提倡在实际开发中,使用MQPullConsumerScheduleService进行拉模式的消息消费。
附录
这里做个小预告,在后续的文章中,笔者将对MQPullConsumerScheduleService消费方式源码实现进行解析,请拭目以待。
为了方便读者和笔者后续开发,这里贴出两种方式的完整源码实现,以略去重读文章的繁琐:
PullConsumer样例
public class PullConsumer {
//Map<key, value> key为指定的队列,value为这个队列拉取数据的最后位置
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
String group_name = "test_pull_consumer_name";
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.start();
System.err.println("consumer start");
// 从TopicTest这个主题去获取所有的队列(默认会有4个队列)
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
// 遍历每一个队列,进行拉取数据
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ: while (true) {
try {
// 从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
System.out.println(pullResult.getPullStatus());
System.out.println();
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for(MessageExt msg : list){
System.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
System.out.println("没有新的数据啦...");
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
MQPullConsumerScheduleService代码样例
public class PullScheduleService {
public static void main(String[] args) throws MQClientException {
String group_name = "test_pull_consumer_name";
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------");
try {
// 从哪里拉取
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for(MessageExt msg : list){
//消费数据
System.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
// 设置再过3000ms后重新拉取
context.setPullNextDelayTimeMillis(3000);
}
catch (Exception e) {
e.printStackTrace();
}
}
});
scheduleService.start();
}
}
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。