跟我学RocketMQ[1-4]之消费消息及封装DefaultMQPushConsumer支持spring
本文我将继续讲解如何使用DefaultMQPushConsumer对RocketMQ中的消息进行消费,同时在文章的第二部分将继续带领读者朋友对DefaultMQPushConsumer进行薄封装,让我们在Spring中更容易对消息进行消费。
DefaultMQPushConsumer不区分普通消息和事务消息,即我们能够利用DefaultMQPushConsumer实现对普通消息和事务消息的消费。
通过DefaultMQPushConsumer消费消息
首先,声明一个DefaultMQPushConsumer客户端,并通过构造器初始化,构造参数为消费者组。官方建议消费者组以“CID_”开头。
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("CID_SNOWALKER");
设置NameServer地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
设置Consumer第一次启动从队列头部开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
设置消费模式为集群方式,CLUSTERING模式下每条消息只会被一个Consumer消费一次,如果设置为BROADCASTING则为广播模式,每个消费者都会将消息消费至少一次。一般我们使用的均为CLUSTERING模式。
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
注册消息监听器,这里需要实现MessageListenerConcurrently接口,并实现consumeMessage(List
defaultMQPushConsumer.registerMessageListener(
(msgs, context) -> {
for (MessageExt msg : msgs) {
String realMessage = new String(msg.getBody());
LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",
Thread.currentThread().getName(),
msg.getMsgId(),
realMessage);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
)
这里注意,当消费逻辑执行成功,则返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,后续将不再对该消息进行消费。如果消费逻辑失败,则需要设置为ConsumeConcurrentlyStatus.RECONSUME_LATER, RocketMQ会对消息进行重新推送,默认推送16次,目的是尽量保证消息消费成功,如果达到最大重试次数,还是失败则进入死信队列,等待人工干预。
调用start()方法,启动对队列的监听,开始进行消息的消费。
defaultMQPushConsumer.start();
我们尝试运行一下,这里我已经有了对应的消费者,可以看下运行的日志:
2019-01-23 09:55:25.022 INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher :
消息id=AC1E5356496018B4AAC2736D06CF0002, 发送结果=SEND_OK
2019-01-23 09:55:27.519 INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer :
当前消费线程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息为={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"测试消息简单发送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}
可以看到broker推送消息至消费端,并且被成功消费。
Spring框架整合DefaultMQPushConsumer
我们仍然基于Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE 对DefaultMQPushConsumer进行整合,相关代码已经上传至github
RocketMQPushConsumerAgent.java
这里对核心代码进行讲解。
首先定义RocketMQPushConsumerAgent.java并将其声明为spring的bean,作用域为prototype,即多例形式。
@Scope("prototype")
@Component
public class RocketMQPushConsumerAgent {
声明消息监听器及消息消费者
private MessageListenerConcurrently messageListener;
private DefaultMQPushConsumer defaultMQPushConsumer;
init()方法为核心的初始化逻辑,在该方法中,初始化了DefaultMQPushConsumer,并设置NameServer地址、消费模式以及将外部实现的监听器设置给内部的messageListener引用。
接着对消息主题进行订阅,对该主题下所有的消息进行监听,这里有待优化,后续将把消息的过滤表达式也暴露给调用者。
所有的配置参数均通过RocketMQConsumerConfig进行设置,保证接口的整洁性,RocketMQConsumerConfig将在附录中进行简单讲解。
public RocketMQPushConsumerAgent init(RocketMQConsumerConfig consumerConfig, MessageListenerConcurrently messageListener) throws MQClientException {
defaultMQPushConsumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());
defaultMQPushConsumer.setNamesrvAddr(consumerConfig.getNameSrvAddr());
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式
if (consumerConfig.getMessageModel() != null) {
defaultMQPushConsumer.setMessageModel(consumerConfig.getMessageModel());
}
// 注册监听器
this.messageListener = messageListener;
defaultMQPushConsumer.registerMessageListener(this.messageListener);
defaultMQPushConsumer.subscribe(consumerConfig.getTopic(), "*");
LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端组装完成");
return this;
}
独立的启动方法
public void start() throws MQClientException {
this.defaultMQPushConsumer.start();
}
独立的关闭方法
public void destroy() {
defaultMQPushConsumer.shutdown();
LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端[已关闭]");
}
为方便外部对消费者进行进一步的自定义设置,提供外部获取defaultMQPushConsumer的接口。
public DefaultMQPushConsumer getConsumer() {
return defaultMQPushConsumer;
}
RocketMQPushConsumerAgent使用案例
仍然依据开头的示例进行改造。
@Component
public class DemoConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoConsumer.class);
使用@Resource(name = “rocketMQPushConsumerAgent”)或者直接@Autowired将自定义的消息消费者注入。
@Resource(name = "rocketMQPushConsumerAgent")
RocketMQPushConsumerAgent rocketMQConsumerAgent;
调用方需要实现一个返回值为void的方法,并标记为@PostConstruct,在该方法中进行rocketMQConsumerAgent的初始化。当spring在加载过程中,DemoConsumer初始化之前会调用该init()方法初始化rocketMQConsumerAgent。通过start()链式调用,启动消息消费者,内部是调用的defaultMQPushConsumer.start()方法。
@PostConstruct
void init() {
try {
rocketMQConsumerAgent.init(
new RocketMQConsumerConfig(
"snowalker-consumer-group",
"172.30.83.100:9876",
"SNOWALKER_TEST",
MessageModel.CLUSTERING),
(msgs, context) -> {
for (MessageExt msg : msgs) {
String realMessage = new String(msg.getBody());
LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",
Thread.currentThread().getName(),
msg.getMsgId(),
realMessage);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
).start();
LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者完成");
} catch (MQClientException e) {
e.printStackTrace();
LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者失败");
}
}
}
在init()方法中同时将消息监听器的实现逻辑注入,消费者会加载该接口的实现。
附录:RocketMQConsumerConfig配置类
public class RocketMQConsumerConfig {
/**消费者组*/
private String consumerGroup;
/**nameServer地址*/
private String nameSrvAddr;
/**消息消费主题*/
private String topic;
private MessageModel messageModel;
public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic) {
Preconditions.checkNotNull(consumerGroup);
Preconditions.checkNotNull(nameSrvAddr);
Preconditions.checkNotNull(topic);
this.consumerGroup = consumerGroup;
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
}
public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic, MessageModel messageModel) {
Preconditions.checkNotNull(consumerGroup);
Preconditions.checkNotNull(nameSrvAddr);
Preconditions.checkNotNull(topic);
Preconditions.checkNotNull(messageModel);
this.consumerGroup = consumerGroup;
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
this.messageModel = messageModel;
}
public String getConsumerGroup() {
return consumerGroup;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
public String getTopic() {
return topic;
}
public MessageModel getMessageModel() {
return messageModel;
}
}
该配置类封装了消费者客户端初始化的必填参数,目的是收拢初始化参数,从而使初始化接口更加简洁,符合开闭原则。我的习惯是超过三个参数后就考虑是否封装为一个javabean,封装的前提是在同一个上下文中,希望能够对你有所帮助。