Apache-ActiveMQ学习笔记-02发布订阅方式          返回主页

本文为ActiveMQ学习笔记的第二篇,主要介绍发布订阅方式进行消息的生产和消费。

首先介绍一下常用的对象,在ActiveMQ中,最为主要的两个对象分别为MessageConsumer(消息消费者),MessageProducer(消息生产者)。

在发布消息或者接收之前需要创建一些基本的对象:

  1. ConnectionFactory 连接工厂,用于建立连接
  2. Connection 连接,开启连接之后建立会话
  3. Session 会话,通过会话创建队列或者主题
  4. Destination 消息的目的地
  5. session.createTopic("topic1"); 这里和之前的点对点方式不同,是采用主题的方式进行session的创建其他方式基本相同

首先建立一个消息的生产者

        private static final String username = ActiveMQConnection.DEFAULT_USER;
        private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
        private static final String brokerURL = ActiveMQConnection.DEFAULT_BROKER_URL;
        private static final int sendNum = 10; //发送的消息数量

        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory;    //连接工厂
            Connection connection = null;                   //连接
            Session session;                        //会话,发送接收消息的线程
            Destination destination;                //消息的目的地
            MessageProducer messageProducer;        //消息发送者/生产者

            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(Producer.username, Producer.password, Producer.brokerURL);
            //获取连接
            try {
                connection = connectionFactory.createConnection();
                connection.start();                 //启动连接
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //开启事务,自动确认
                //创建主题
                destination = session.createTopic("topic1");
                //创建消息生产者
                messageProducer = session.createProducer(destination);
                //发送消息
                sendMessage(session, messageProducer);
                session.commit();                   //事务提交
            } catch (JMSException e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    connection.close();
                }
            }
        }
        /**
         * 发送消息
         * @param session
         * @param messageProducer
         * @throws Exception

        public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
            for (int i = 0; i < Producer.sendNum; i++) {
                TextMessage textMessage = session.createTextMessage("ActiveMQ发送的消息" + i);
                System.out.println("发布消息:" + "ActiveMQ发送的消息" + i);
                messageProducer.send(textMessage);
            }
        }

该生产者每次运行向队列中添加十条文本信息供消费

建立消息的消费者

    private static final String username = ActiveMQConnection.DEFAULT_USER;
    private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String brokerURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory;    //连接工厂
        Connection connection = null;           //连接
        Session session;                        //会话,发送接收消息的线程
        Destination destination;                //消息的目的地
        MessageConsumer messageConsumer;        //消息消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(Consumer1.username, Consumer1.password, Consumer1.brokerURL);
        //获取连接
        connection = connectionFactory.createConnection();
        connection.start(); //启动连接
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);    //不加事务
        destination = session.createTopic("topic1");    //创建连接的消息队列
        //创建消息消费者
        messageConsumer = session.createConsumer(destination);
        //注册监听器方式二:监听器
        messageConsumer.setMessageListener(new MListener1());
    }

需要先定义一个消息监听器,只需要实现MessageListener接口,重写其中的onMessage()回调方法。

    public class MListener1 implements MessageListener {

            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println("订阅者1接收到的消息" + ((TextMessage) message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

    }

和点对点的不同之处主要就是创建消息目的地的方式不同了,这里是createTopic方式。然后在消费者中注册消息监听器。一旦队列中有消息就调用回调方法进行消息的消费。

小结

本文仅仅是对ActiveMQ的基本使用的介绍,它在实际生产中运用广泛,尤其是在异步消息处理和订单管理及服务间通讯运用很广泛,更多的运用还应当在实践中自行体会。