跟我学RocketMQ[1-3]之发送普通消息及封装DefaultMQProducer支持spring
本文是《跟我学RocketMQ系列》的第三篇,前面两篇中,我带领大家了解了如何搭建RocketMQ以及如何通过web端的console进行RocketMQ的运维。
从本文开始,我将从研发的角度,逐步深入RocketMQ。
本文先讲解如何利用RocketMQ的java客户端进行普通消息的发送以及对它进行薄封装,以便更好的适配spring框架。
对RocketMQ的封装版本的代码已经上传github,shield-rocketmq-client-spring 欢迎大家star及fork~
通过DefaultMQProducer发送普通消息
RocketMQ使用DefaultMQProducer实现普通消息的发送操作。
首先通过构造方法初始化一个生产者组为“PID-TEST”的普通消息生产者。
注意 RocketMQ官方建议,生产者组统一以 PID_ 开头,消费者组统一以 CID_ 开头。
DefaultMQProducer defaultMQProducer =
new DefaultMQProducer("PID_TEST");
设置Nameserver地址
defaultMQProducer.setNamesrvAddr("127.0.0.1:9876");
启动生产者,建立到broker的链接
defaultMQProducer.start();
使用Jackson进行消息实体序列化
ObjectMapper objectMapper = new ObjectMapper();
for (int i = 0; i < 10; i++) {
try {
MessageBean msg = new MessageBean("rocketmq-simple-msg-test",
"SNOWALKER_TEST",
"SNOWALKER_TEST-TAG",
"localhost.localdomain",
"测试消息简单发送------第" + i + "次",
"10",
"simple-msg-test-" + i);
构造消息协议并使用Jackson序列化为JSON字符串
String message = objectMapper.writeValueAsString(msg);
使用官方的Message实体,构造消息体,并设置消息发布的主题名,TAG名,同时需要将要发送的消息体转换为二进制形式。
Message sendMessage = new Message(
msg.getTopicName(),
msg.getTagName(),
message.getBytes());
通过调用defaultMQProducer的send(Message msg, SendCallback sendCallback)方法进行消息发送。
这里发送方需要实现SendCallback回调接口,实现其中的onSuccess,onException方法,分别对应发送结果的成功和异常两种情况。正式的业务场景中需要对这些情况做对应的业务操作。
defaultMQProducer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOGGER.info("消息id={}, 发送结果={}" ,
sendResult.getMsgId(),
sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
LOGGER.info("消息主题={}, 消息体={}" ,
sendMessage.getTopic(),
new String(sendMessage.getBody()));
throwable.printStackTrace();
}
});
消息发送失败会抛出MQClientException,正式的业务中需要对异常进行捕获并处理。
} catch (Exception e) {
e.printStackTrace();
}
}
我们尝试运行一下,这里我已经有了对应的消费者,可以看下运行的日志:
2019-01-23 09:55:25.022 INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher :
消息id=AC1E5356496018B4AAC2736D06CF0002, 发送结果=SEND_OK
2019-01-23 09:55:27.519 INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer :
当前消费线程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息为={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"测试消息简单发送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}
可以看到消息成功提交到了Broker并且被消费者消息,识别的标志是msgId。
Spring框架整合DefaultMQProducer
目前业务系统大量使用了Springboot、spring框架,因此我们对DefaultMQProducer进行一层薄封装,话不多说直接上代码。
封装基于 Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE,请读者自行添加依赖。
声明为Spring的一个bean,同时声明为prototype,支持多例。
@Component
@Scope("prototype")
public class RocketMQSimpleProducerAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQSimpleProducerAgent.class);
声明DefaultMQProducer为成员变量,不进行初始化,初始化操作在后续的init()方法中进行。
private DefaultMQProducer defaultMQProducer;
初始化defaultMQProducer,构造方法传入生产者组id,并设置NameServer的地址,这里将配置统一封装到RocketMQProducerConfig配置类中(具体内容在附录中)。
public RocketMQSimpleProducerAgent init(RocketMQProducerConfig producerConfig) throws Exception {
defaultMQProducer =
new DefaultMQProducer(producerConfig.getProducerGroup());
defaultMQProducer.setNamesrvAddr(
producerConfig.getNameSrvAddr());
LOGGER.debug("com.shield.job.message.rocketmq.RocketMQProducerAgent[初始化完成]");
return this;
}
独立的生产者启动方法。
/**
* 启动消费者服务
*/
public void start() throws MQClientException {
this.defaultMQProducer.start();
}
独立的关闭方法。
public void destroy() {
this.defaultMQProducer.shutdown();
LOGGER.debug
("com.shield.job.message.rocketmq.RocketMQProducerAgent[已关闭]");
}
为方便外部对生产者进行进一步的自定义设置,提供外部获取defaultMQProducer的接口。
public DefaultMQProducer getProducer() {
return this.defaultMQProducer;
}
}
很简洁的薄封装,那么如何使用呢?
RocketMQSimpleProducerAgent使用案例
我们还是基于文章开始时候的例子,将其改造成为基于RocketMQSimpleProducerAgent的调用方式,代码如下。
@Component
public class DemoPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoPublisher.class);
引入RocketMQSimpleProducerAgent类,这里建议将封装后的客户端统一写为一个二方包,便于各个项目使用。
使用@Resource(name = “rocketMQSimpleProducerAgent”)或者直接@Autowired将自定义的普通消息生产者注入。
@Resource(name = "rocketMQSimpleProducerAgent")
RocketMQSimpleProducerAgent rocketMQProducerAgent;
调用方需要实现一个返回值为void的方法,并标记为@PostConstruct,在该方法中进行rocketMQProducerAgent的初始化。当spring在加载过程中,DemoPublisher初始化之前会调用该init()方法初始化rocketMQProducerAgent。通过start()链式调用,启动普通消息生产者,内部是调用的defaultMQProducer.start()方法。
@PostConstruct
void init() throws Exception {
rocketMQProducerAgent.init(new RocketMQProducerConfig(
"group-snowalker",
"172.30.83.100:9876"
)).start();
this.publish();
}
生产者逻辑和上文讲解的没有区别。
public void publish() {
ObjectMapper objectMapper = new ObjectMapper();
for (int i = 0; i < 10; i++) {
try {
MessageBean msg = new MessageBean("rocketmq-simple-msg-test",
"SNOWALKER_TEST",
"SNOWALKER_TEST-TAG",
"localhost.localdomain",
"测试消息简单发送------第" + i + "次",
"10",
"simple-msg-test-" + i);
String message = objectMapper.writeValueAsString(msg);
Message sendMessage = new Message(
msg.getTopicName(), msg.getTagName(), message.getBytes());
rocketMQProducerAgent.getProducer().send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOGGER.info("消息id={}, 发送结果={}" ,sendResult.getMsgId(), sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
LOGGER.info("消息主题={}, 消息体={}" ,sendMessage.getTopic(), new String(sendMessage.getBody()));
throwable.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行之后的效果和一开始的直接调用RocketMQ的java开发包一致,但是这种方式可以让我们更加灵活的在不同的业务中使用消息发送,参数都是可以自定义的。可以使用@Value读取配置文件,能够让我们更关注业务逻辑而不需要关注消息发送的细节。
附录:RocketMQProducerConfig配置类
该配置类封装了生产者客户端初始化的必填参数,目的是收拢初始化参数,从而使初始化接口更加简洁,符合开闭原则。
/**
* @author snowalker
* @version 1.0
* @date 2019/1/21 10:38
* @className RocketMQProducerConfig
* @desc RocketMQ生产者配置
*/
public class RocketMQProducerConfig {
/**生产者组*/
private String producerGroup;
/**指定NameServer名称*/
private String nameSrvAddr;
public RocketMQProducerConfig(String producerGroup, String nameSrvAddr) {
Preconditions.checkNotNull(producerGroup);
Preconditions.checkNotNull(nameSrvAddr);
this.producerGroup = producerGroup;
this.nameSrvAddr = nameSrvAddr;
}
public String getProducerGroup() {
return producerGroup;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
}