文章目录
  1. 1. 通过DefaultMQProducer发送普通消息
  2. 2. Spring框架整合DefaultMQProducer
  3. 3. RocketMQSimpleProducerAgent使用案例
  4. 4. 附录:RocketMQProducerConfig配置类

本文是《跟我学RocketMQ系列》的第三篇,前面两篇中,我带领大家了解了如何搭建RocketMQ以及如何通过web端的console进行RocketMQ的运维。

从本文开始,我将从研发的角度,逐步深入RocketMQ。

本文先讲解如何利用RocketMQ的java客户端进行普通消息的发送以及对它进行薄封装,以便更好的适配spring框架。

对RocketMQ的封装版本的代码已经上传github,shield-rocketmq-client-spring 欢迎大家star及fork~

通过DefaultMQProducer发送普通消息

RocketMQ使用DefaultMQProducer实现普通消息的发送操作。

首先通过构造方法初始化一个生产者组为“PID-TEST”的普通消息生产者。

注意 RocketMQ官方建议,生产者组统一以 PID_ 开头,消费者组统一以 CID_ 开头。

DefaultMQProducer defaultMQProducer =
     new DefaultMQProducer("PID_TEST");

设置Nameserver地址

defaultMQProducer.setNamesrvAddr("127.0.0.1:9876");

启动生产者,建立到broker的链接

defaultMQProducer.start();

使用Jackson进行消息实体序列化

ObjectMapper objectMapper = new ObjectMapper();
for (int i = 0; i < 10; i++) {
    try {
        MessageBean msg = new MessageBean("rocketmq-simple-msg-test",
                "SNOWALKER_TEST",
                "SNOWALKER_TEST-TAG",
                "localhost.localdomain",
                "测试消息简单发送------第" + i + "次",
                "10",
                "simple-msg-test-" + i);

构造消息协议并使用Jackson序列化为JSON字符串

String message  = objectMapper.writeValueAsString(msg);

使用官方的Message实体,构造消息体,并设置消息发布的主题名,TAG名,同时需要将要发送的消息体转换为二进制形式。

Message sendMessage = new Message(
        msg.getTopicName(),
        msg.getTagName(),
        message.getBytes());

通过调用defaultMQProducer的send(Message msg, SendCallback sendCallback)方法进行消息发送。

这里发送方需要实现SendCallback回调接口,实现其中的onSuccess,onException方法,分别对应发送结果的成功和异常两种情况。正式的业务场景中需要对这些情况做对应的业务操作。

defaultMQProducer.send(sendMessage, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        LOGGER.info("消息id={}, 发送结果={}" ,
        sendResult.getMsgId(), 
        sendResult.getSendStatus());
    }

    @Override
    public void onException(Throwable throwable) {
        LOGGER.info("消息主题={}, 消息体={}" ,
        sendMessage.getTopic(),
        new String(sendMessage.getBody()));
        throwable.printStackTrace();
    }
});

消息发送失败会抛出MQClientException,正式的业务中需要对异常进行捕获并处理。

    } catch (Exception e) {
        e.printStackTrace();
    }
}

我们尝试运行一下,这里我已经有了对应的消费者,可以看下运行的日志:

2019-01-23 09:55:25.022  INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher   : 
消息id=AC1E5356496018B4AAC2736D06CF0002, 发送结果=SEND_OK

2019-01-23 09:55:27.519  INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer     : 
当前消费线程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息为={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"测试消息简单发送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}

可以看到消息成功提交到了Broker并且被消费者消息,识别的标志是msgId。

Spring框架整合DefaultMQProducer

目前业务系统大量使用了Springboot、spring框架,因此我们对DefaultMQProducer进行一层薄封装,话不多说直接上代码。

封装基于 Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE,请读者自行添加依赖。

声明为Spring的一个bean,同时声明为prototype,支持多例。

@Component
@Scope("prototype")
public class RocketMQSimpleProducerAgent {

    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQSimpleProducerAgent.class);

声明DefaultMQProducer为成员变量,不进行初始化,初始化操作在后续的init()方法中进行。

private DefaultMQProducer defaultMQProducer;

初始化defaultMQProducer,构造方法传入生产者组id,并设置NameServer的地址,这里将配置统一封装到RocketMQProducerConfig配置类中(具体内容在附录中)。

public RocketMQSimpleProducerAgent init(RocketMQProducerConfig producerConfig) throws Exception {
    defaultMQProducer = 
        new DefaultMQProducer(producerConfig.getProducerGroup());
    defaultMQProducer.setNamesrvAddr(
        producerConfig.getNameSrvAddr());
    LOGGER.debug("com.shield.job.message.rocketmq.RocketMQProducerAgent[初始化完成]");
    return this;
}

独立的生产者启动方法。

/**
* 启动消费者服务
*/
public void start() throws MQClientException {
    this.defaultMQProducer.start();
}

独立的关闭方法。

public void destroy() {
    this.defaultMQProducer.shutdown();
    LOGGER.debug
        ("com.shield.job.message.rocketmq.RocketMQProducerAgent[已关闭]");
}

为方便外部对生产者进行进一步的自定义设置,提供外部获取defaultMQProducer的接口。

    public DefaultMQProducer getProducer() {
        return this.defaultMQProducer;
    }

}

很简洁的薄封装,那么如何使用呢?

RocketMQSimpleProducerAgent使用案例

我们还是基于文章开始时候的例子,将其改造成为基于RocketMQSimpleProducerAgent的调用方式,代码如下。

@Component
public class DemoPublisher {

    private static final Logger LOGGER = LoggerFactory.getLogger(DemoPublisher.class);

引入RocketMQSimpleProducerAgent类,这里建议将封装后的客户端统一写为一个二方包,便于各个项目使用。

使用@Resource(name = “rocketMQSimpleProducerAgent”)或者直接@Autowired将自定义的普通消息生产者注入。

@Resource(name = "rocketMQSimpleProducerAgent")
RocketMQSimpleProducerAgent rocketMQProducerAgent;

调用方需要实现一个返回值为void的方法,并标记为@PostConstruct,在该方法中进行rocketMQProducerAgent的初始化。当spring在加载过程中,DemoPublisher初始化之前会调用该init()方法初始化rocketMQProducerAgent。通过start()链式调用,启动普通消息生产者,内部是调用的defaultMQProducer.start()方法。

@PostConstruct
void init() throws Exception {
    rocketMQProducerAgent.init(new RocketMQProducerConfig(
            "group-snowalker",
            "172.30.83.100:9876"
    )).start();
    this.publish();
}

生产者逻辑和上文讲解的没有区别。

    public void publish() {
        ObjectMapper objectMapper = new ObjectMapper();
        for (int i = 0; i < 10; i++) {
            try {
                MessageBean msg = new MessageBean("rocketmq-simple-msg-test",
                        "SNOWALKER_TEST",
                        "SNOWALKER_TEST-TAG",
                        "localhost.localdomain",
                        "测试消息简单发送------第" + i + "次",
                        "10",
                        "simple-msg-test-" + i);
                String message  = objectMapper.writeValueAsString(msg);

                Message sendMessage = new Message(
                        msg.getTopicName(), msg.getTagName(), message.getBytes());

                rocketMQProducerAgent.getProducer().send(sendMessage, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        LOGGER.info("消息id={}, 发送结果={}" ,sendResult.getMsgId(), sendResult.getSendStatus());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        LOGGER.info("消息主题={}, 消息体={}" ,sendMessage.getTopic(), new String(sendMessage.getBody()));
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

运行之后的效果和一开始的直接调用RocketMQ的java开发包一致,但是这种方式可以让我们更加灵活的在不同的业务中使用消息发送,参数都是可以自定义的。可以使用@Value读取配置文件,能够让我们更关注业务逻辑而不需要关注消息发送的细节。

附录:RocketMQProducerConfig配置类

该配置类封装了生产者客户端初始化的必填参数,目的是收拢初始化参数,从而使初始化接口更加简洁,符合开闭原则。

/**
* @author snowalker
* @version 1.0
* @date 2019/1/21 10:38
* @className RocketMQProducerConfig
* @desc RocketMQ生产者配置
*/
public class RocketMQProducerConfig {

    /**生产者组*/
    private String producerGroup;

    /**指定NameServer名称*/
    private String nameSrvAddr;

    public RocketMQProducerConfig(String producerGroup, String nameSrvAddr) {
        Preconditions.checkNotNull(producerGroup);
        Preconditions.checkNotNull(nameSrvAddr);
        this.producerGroup = producerGroup;
        this.nameSrvAddr = nameSrvAddr;
    }

    public String getProducerGroup() {
        return producerGroup;
    }

    public String getNameSrvAddr() {
        return nameSrvAddr;
    }
}
文章目录
  1. 1. 通过DefaultMQProducer发送普通消息
  2. 2. Spring框架整合DefaultMQProducer
  3. 3. RocketMQSimpleProducerAgent使用案例
  4. 4. 附录:RocketMQProducerConfig配置类
Fork me on GitHub