文章目录
  1. 1. PullConsumer消费示例代码
    1. 1.1. 小结PullConsumer消费方式
  2. 2. MQPullConsumerScheduleService+PullTaskCallback消费方式
    1. 2.1. 小结MQPullConsumerScheduleService消费方式
  3. 3. 附录
    1. 3.1. PullConsumer样例
    2. 3.2. MQPullConsumerScheduleService代码样例

今天我们聊聊RocketMQ基于拉模式的两种消费方式。

对于消费而言,RocketMQ提供了推拉两种方式,我们常用的是基于长轮询的DefaultPushConsumer,它具有实时性好,易开发等特点。

但同时由于是长轮询,因此在大量消息消费的场景下,可能导致broker端CPU负载较大等情况,因此我们会在这种情况下选择使用拉模式的

PullConsumer或者MQPullConsumerScheduleService+PullTaskCallback这两种方式进行更为灵活的消费。

PullConsumer消费示例代码

首先我们看下基于PullConsumer方式如何进行消费。这里引用官方的样例代码进行说明:

public class PullConsumer {

我们定义了一个Map,key为指定的队列,value为这个队列拉取数据的最后位置,保存每个对列的拉取进度(offset),这里只是用作示例。实际开发中,这里需要基于持久化到Redis或者MySQL等外部存储设施中。

//Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {

首先需要定义消费者组,实例化一个DefaultMQPullConsumer消费者对象,标记消费者组。

为消费者设置NameServer地址,保证消费者客户端能够从NameServer获取到broker地址,从而执行消息消费流程。

String group_name = "test_pull_consumer_name";
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.start();
System.err.println("consumer start");

通过consumer.fetchSubscribeMessageQueues(TOPIC)方法获取指定TOPIC下的所有队列,默认有4个。

//    从TopicTest这个主题去获取所有的队列(默认会有4个队列)
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");

对获取到MessageQueue集合进行遍历,拉取数据并执行具体的消费过程。

//    遍历每一个队列,进行拉取数据
for (MessageQueue mq : mqs) {
    System.out.println("Consume from the queue: " + mq);

通过while(true) 不间断地从队列中拉取数据。默认情况下,每次拉取32条,这里需要显式地传入拉取开始offset,通过getMessageQueueOffset(mq)方法获取,从我们持久化的设施中得到对应MessageQueue的拉取进度(可以认为是消费进度)。

拉取结束后,在持久化设施(这里是一个Map)保存下次拉取的开始offset,也就是本次拉取结束的下一个offset。(通过pullResult.getNextBeginOffset()获取)

while (true) {
    try {
        //    从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
        System.out.println(pullResult);
        System.out.println(pullResult.getPullStatus());
        System.out.println();
        putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

从pullResult拉取结果中获取拉取状态,如果是FOUND则表明消息拉取成功;获取消息列表,并循环进行消费。其余均认为未拉取到消息,不做处理。

                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> list = pullResult.getMsgFoundList();
                        for(MessageExt msg : list){
                            System.out.println(new String(msg.getBody()));
                        }
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        System.out.println("没有新的数据啦...");
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    consumer.shutdown();
}

拉取结束之后,手动显式调用该方法,刷新对应队列MessageQueue的拉取进度;

private static void putMessageQueueOffset(MessageQueue mq, long offset) {
    offseTable.put(mq, offset);
}

获取对应MessageQueue的消息消费进度Offset

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }

}

小结PullConsumer消费方式

从上述代码样例中可以看出,PullConsumer方式需要我们显式地存储消费进度,并且在消费过程中要根据情况进行消费进度的更新与存储。

如果开发者稍有不慎,忘记保存offset,则每次都会从第一条进行拉取,这样很容易造成消息重复。如果是生产环境,则后果不忍想象。

另外,我们还需要通过额外的存储手段对offset进行保存,并且尽量保证该设施的稳定可靠,否则还是会引起重复消费的问题。

基于此,我建议使用MQPullConsumerScheduleService+PullTaskCallback这种消费方式,那它具体如何使用呢?

MQPullConsumerScheduleService+PullTaskCallback消费方式

基于上述分析的PullConsumer使用一些不便之处,我这里建议使用MQPullConsumerScheduleService+PullTaskCallback方式进行消费。我们还是按照习惯方式,直接上代码。

step1: 声明并实例化一个MQPullConsumerScheduleService对象,通过构造方法传递消费者组;

String group_name = "test_pull_consumer_name";

final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);

为消费者设置NameServer地址,以便能够获取broker地址,开启消费过程。

scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);

设置消费方式为集群模式;

scheduleService.setMessageModel(MessageModel.CLUSTERING);

step2: 调用registerPullTaskCallback(topic, pullTaskCallback) ,将开发者实现的PullTaskCallback消息拉取实现类注册给MQPullConsumerScheduleService。并绑定到指定的topic下;

scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {

step3: 开发者需要实现PullTaskCallback的doPullTask消息拉取回调方法,这里使用匿名内部类的方式。如果是Spring项目,我们可以定义一个Bean实现PullTaskCallback接口,并将该Bean的引用设置到一个实例化好的MQPullConsumerScheduleService对象中。

@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {

通过 PullTaskContext上下文获取到消息拉取实例对象MQPullConsumer;

MQPullConsumer consumer = context.getPullConsumer();
System.err.println("-------------- queueId: " + mq.getQueueId()  + "-------------");
try {

获取当前的消费进度,即:从哪儿开始消费,如果offset小于0则指定从0开始。

// 获取从哪里拉取
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
    offset = 0;

从对应的offset拉取指定数量的消息,默认32条,返回结果为PullResult。

通过pullResult.getPullStatus()判断拉取结果,如果为FOUND,则开始消费流程;其他状态不做处理。

PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
    case FOUND:
        List<MessageExt> list = pullResult.getMsgFoundList();
        for(MessageExt msg : list){
            //消费数据...
            ystem.out.println(new String(msg.getBody()));
        }
        break;
    case NO_MATCHED_MSG:
        break;
    case NO_NEW_MSG:
    case OFFSET_ILLEGAL:
        break;
    default:
        break;
}

step4: 重点来了,这里通过调用updateConsumeOffset,更新消费进度,将下次消费开始时的offset更新到broker。并不需要客户端本地保存消费进度。

设置下次拉取时间,定时进行拉取调度。

    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
    // 设置再过3000ms后重新拉取
    context.setPullNextDelayTimeMillis(3000);

}
...省略catch块...

启动拉取过程。

        scheduleService.start();
    }
}

小结MQPullConsumerScheduleService消费方式

从代码中我们能够清晰的看出,MQPullConsumerScheduleService的优势:

  1. MQPullConsumerScheduleService基于定时任务,消费端能够灵活控制拉取频率
  2. MQPullConsumerScheduleService支持提交消费进度到broker,不需要消费端进行保存
  3. MQPullConsumerScheduleService本身基于PullConsumer,定制化程度高,使用起来不易出错

可以说,MQPullConsumerScheduleService既保留了PullConsumer的优势,还对其进行了一定程序的增强;通过直接提交消费offset到broker,降低了客户端的开发量,较少了消费重复的风险。

因此笔者提倡在实际开发中,使用MQPullConsumerScheduleService进行拉模式的消息消费。

附录

这里做个小预告,在后续的文章中,笔者将对MQPullConsumerScheduleService消费方式源码实现进行解析,请拭目以待。

为了方便读者和笔者后续开发,这里贴出两种方式的完整源码实现,以略去重读文章的繁琐:

PullConsumer样例

public class PullConsumer {
    //Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {

        String group_name = "test_pull_consumer_name";
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        consumer.start();
        System.err.println("consumer start");
        //    从TopicTest这个主题去获取所有的队列(默认会有4个队列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
        //    遍历每一个队列,进行拉取数据
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);

            SINGLE_MQ: while (true) {
                try {
                    //    从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    System.out.println(pullResult.getPullStatus());
                    System.out.println();
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> list = pullResult.getMsgFoundList();
                            for(MessageExt msg : list){
                                System.out.println(new String(msg.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            System.out.println("没有新的数据啦...");
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }


    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }


    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }

}

MQPullConsumerScheduleService代码样例

public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {

        String group_name = "test_pull_consumer_name";

        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);

        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);

        scheduleService.setMessageModel(MessageModel.CLUSTERING);

        scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {

            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                System.err.println("-------------- queueId: " + mq.getQueueId()  + "-------------");
                try {
                    // 从哪里拉取
                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;

                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> list = pullResult.getMsgFoundList();
                        for(MessageExt msg : list){
                            //消费数据
                            System.out.println(new String(msg.getBody()));
                        }
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    // 设置再过3000ms后重新拉取
                    context.setPullNextDelayTimeMillis(3000);

                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scheduleService.start();
    }
}



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. PullConsumer消费示例代码
    1. 1.1. 小结PullConsumer消费方式
  2. 2. MQPullConsumerScheduleService+PullTaskCallback消费方式
    1. 2.1. 小结MQPullConsumerScheduleService消费方式
  3. 3. 附录
    1. 3.1. PullConsumer样例
    2. 3.2. MQPullConsumerScheduleService代码样例
Fork me on GitHub