文章目录
  1. 1. 通过DefaultMQProducer消费消息
  2. 2. Spring框架整合DefaultMQPushConsumer
  3. 3. RocketMQPushConsumerAgent使用案例
  4. 4. 附录:RocketMQConsumerConfig配置类

本文我将继续讲解如何使用DefaultMQPushConsumer对RocketMQ中的消息进行消费,同时在文章的第二部分将继续带领读者朋友对DefaultMQPushConsumer进行薄封装,让我们在Spring中更容易对消息进行消费。

DefaultMQPushConsumer不区分普通消息和事务消息,即我们能够利用DefaultMQPushConsumer实现对普通消息和事务消息的消费。

通过DefaultMQProducer消费消息

首先,声明一个DefaultMQPushConsumer客户端,并通过构造器初始化,构造参数为消费者组。官方建议消费者组以“CID_”开头。

DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer("CID_SNOWALKER");

设置NameServer地址

defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

设置Consumer第一次启动从队列头部开始消费

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

设置消费模式为集群方式,CLUSTERING模式下每条消息只会被一个Consumer消费一次,如果设置为BROADCASTING则为广播模式,每个消费者都会将消息消费至少一次。一般我们使用的均为CLUSTERING模式。

defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);

注册消息监听器,这里需要实现MessageListenerConcurrently接口,并实现consumeMessage(List msgs, ConsumeConcurrentlyContext context) 方法,我这里的demo是lambda形式,实际上是一样的。如果你不喜欢lambda形式,可以继续使用匿名内部类或者自行定义一个类实现该接口。

defaultMQPushConsumer.registerMessageListener(
    (msgs, context) -> {
        for (MessageExt msg : msgs) {
            String realMessage = new String(msg.getBody());
            LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",
                Thread.currentThread().getName(),
                msg.getMsgId(),
                realMessage);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
)

这里注意,当消费逻辑执行成功,则返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,后续将不再对该消息进行消费。如果消费逻辑失败,则需要设置为ConsumeConcurrentlyStatus.RECONSUME_LATER, RocketMQ会对消息进行重新推送,默认推送16次,目的是尽量保证消息消费成功,如果达到最大重试次数,还是失败则进入死信队列,等待人工干预。

调用start()方法,启动对队列的监听,开始进行消息的消费。

defaultMQPushConsumer.start();

我们尝试运行一下,这里我已经有了对应的消费者,可以看下运行的日志:

2019-01-23 09:55:25.022  INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher   : 
消息id=AC1E5356496018B4AAC2736D06CF0002, 发送结果=SEND_OK

2019-01-23 09:55:27.519  INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer     : 
当前消费线程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息为={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"测试消息简单发送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}

可以看到broker推送消息至消费端,并且被成功消费。

Spring框架整合DefaultMQPushConsumer

我们仍然基于Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE 对DefaultMQPushConsumer进行整合,相关代码已经上传至github

RocketMQPushConsumerAgent.java

这里对核心代码进行讲解。

首先定义RocketMQPushConsumerAgent.java并将其声明为spring的bean,作用域为prototype,即多例形式。

@Scope("prototype")
@Component
public class RocketMQPushConsumerAgent {

声明消息监听器及消息消费者

private MessageListenerConcurrently messageListener;

private DefaultMQPushConsumer defaultMQPushConsumer;

init()方法为核心的初始化逻辑,在该方法中,初始化了DefaultMQPushConsumer,并设置NameServer地址、消费模式以及将外部实现的监听器设置给内部的messageListener引用。

接着对消息主题进行订阅,对该主题下所有的消息进行监听,这里有待优化,后续将把消息的过滤表达式也暴露给调用者。

所有的配置参数均通过RocketMQConsumerConfig进行设置,保证接口的整洁性,RocketMQConsumerConfig将在附录中进行简单讲解。

public RocketMQPushConsumerAgent init(RocketMQConsumerConfig consumerConfig, MessageListenerConcurrently messageListener)  throws MQClientException  {
    defaultMQPushConsumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());
    defaultMQPushConsumer.setNamesrvAddr(consumerConfig.getNameSrvAddr());
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式
    if (consumerConfig.getMessageModel() != null) {
        defaultMQPushConsumer.setMessageModel(consumerConfig.getMessageModel());
    }
    // 注册监听器
    this.messageListener = messageListener;
    defaultMQPushConsumer.registerMessageListener(this.messageListener);
    defaultMQPushConsumer.subscribe(consumerConfig.getTopic(), "*");
    LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端组装完成");
    return this;
}

独立的启动方法

public void start() throws MQClientException {
    this.defaultMQPushConsumer.start();
}

独立的关闭方法

public void destroy() {
    defaultMQPushConsumer.shutdown();
    LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端[已关闭]");
}

为方便外部对消费者进行进一步的自定义设置,提供外部获取defaultMQPushConsumer的接口。

public DefaultMQPushConsumer getConsumer() {
    return defaultMQPushConsumer;
}

RocketMQPushConsumerAgent使用案例

仍然依据开头的示例进行改造。

@Component
public class DemoConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(DemoConsumer.class);

使用@Resource(name = “rocketMQPushConsumerAgent”)或者直接@Autowired将自定义的消息消费者注入。

@Resource(name = "rocketMQPushConsumerAgent")
RocketMQPushConsumerAgent rocketMQConsumerAgent;

调用方需要实现一个返回值为void的方法,并标记为@PostConstruct,在该方法中进行rocketMQConsumerAgent的初始化。当spring在加载过程中,DemoConsumer初始化之前会调用该init()方法初始化rocketMQConsumerAgent。通过start()链式调用,启动消息消费者,内部是调用的defaultMQPushConsumer.start()方法。

    @PostConstruct
    void init() {
        try {
            rocketMQConsumerAgent.init(
                    new RocketMQConsumerConfig(
                            "snowalker-consumer-group",
                            "172.30.83.100:9876",
                            "SNOWALKER_TEST",
                            MessageModel.CLUSTERING),
                    (msgs, context) -> {
                        for (MessageExt msg : msgs) {
                            String realMessage = new String(msg.getBody());
                            LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",
                                    Thread.currentThread().getName(),
                                    msg.getMsgId(),
                                    realMessage);
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
            ).start();
            LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者完成");
        } catch (MQClientException e) {
            e.printStackTrace();
            LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者失败");
        }

    }
}

在init()方法中同时将消息监听器的实现逻辑注入,消费者会加载该接口的实现。

附录:RocketMQConsumerConfig配置类

public class RocketMQConsumerConfig {

    /**消费者组*/
    private String consumerGroup;

    /**nameServer地址*/
    private String nameSrvAddr;

    /**消息消费主题*/
    private String topic;

    private MessageModel messageModel;

    public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic) {
        Preconditions.checkNotNull(consumerGroup);
        Preconditions.checkNotNull(nameSrvAddr);
        Preconditions.checkNotNull(topic);
        this.consumerGroup = consumerGroup;
        this.nameSrvAddr = nameSrvAddr;
        this.topic = topic;
    }

    public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic, MessageModel messageModel) {
        Preconditions.checkNotNull(consumerGroup);
        Preconditions.checkNotNull(nameSrvAddr);
        Preconditions.checkNotNull(topic);
        Preconditions.checkNotNull(messageModel);
        this.consumerGroup = consumerGroup;
        this.nameSrvAddr = nameSrvAddr;
        this.topic = topic;
        this.messageModel = messageModel;
    }

    public String getConsumerGroup() {
        return consumerGroup;
    }

    public String getNameSrvAddr() {
        return nameSrvAddr;
    }

    public String getTopic() {
        return topic;
    }

    public MessageModel getMessageModel() {
        return messageModel;
    }
}

该配置类封装了消费者客户端初始化的必填参数,目的是收拢初始化参数,从而使初始化接口更加简洁,符合开闭原则。我的习惯是超过三个参数后就考虑是否封装为一个javabean,封装的前提是在同一个上下文中,希望能够对你有所帮助。

文章目录
  1. 1. 通过DefaultMQProducer消费消息
  2. 2. Spring框架整合DefaultMQPushConsumer
  3. 3. RocketMQPushConsumerAgent使用案例
  4. 4. 附录:RocketMQConsumerConfig配置类
Fork me on GitHub