Apache-ActiveMQ学习笔记-01入门及点对点消息发布          返回主页

背景介绍

  1. JMS:

    JMS即Java消息服务(Java Message Service)应用程序接口,
    是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,
    或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,
    绝大多数MOM提供商都对JMS提供支持。
    

JMS就是一个标准,类似于JDBC,具体的实现由第三方的中间件厂商实现。

  1. ApacheMQ

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。
    ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,
    尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
    

ActiveMQ入门

使用ActiveMQ开发之前首先需要下载其二进制安装包,解压之后配置环境变量,将其bin所在的位置添加到path中,确保已经安装了JDK

进入bin下选择对应的版本运行activemq.bat脚本,命令行下显示如下内容

    wrapper  | --> Wrapper Started as Console
    wrapper  | Launching a JVM...
    jvm 1    | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org
    jvm 1    |   Copyright 1999-2006 Tanuki Software, Inc.  All Rights Reserved.
    jvm 1    |
    jvm 1    | Java Runtime: Oracle Corporation 1.8.0_91 C:\Program Files\Java\jre1.8.0_91
    jvm 1    |   Heap sizes: current=186880k  free=176065k  max=932352k
    jvm 1    |     JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=tGn9iDevuzr9leeO -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=6352 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1
    jvm 1    | Extensions classpath:
    jvm 1    |   [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]
    jvm 1    | ACTIVEMQ_HOME: ..\..
    jvm 1    | ACTIVEMQ_BASE: ..\..
    ...

表明安装及运行正常,访问url:http://127.0.0.1:8161/admin/,这里需要输入账号密码默认都是admin。

开发阶段

到这里就可以进行程序的开发,有两种基本的开发方式,分别为

  1. 点对点方式
  2. 发布-订阅方式

下文对这两种开发方式分别进行讲解

点对点方式

首先介绍一下常用的对象,在ActiveMQ中,最为主要的两个对象分别为MessageConsumer(消息消费者),MessageProducer(消息生产者)。

在发布消息或者接收之前需要创建一些基本的对象:

  1. ConnectionFactory 连接工厂,用于建立连接
  2. Connection 连接,开启连接之后建立会话
  3. Session 会话,通过会话创建队列或者主题
  4. Destination 消息的目的地
  5. Queue 消息队列,可以转为目的地Destination

首先建立一个消息的生产者

public class JMSProducer {

    private static final String username = ActiveMQConnection.DEFAULT_USER;
    private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String brokerURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final int sendNum = 10; //发送的消息数量

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory;    //连接工厂
        Connection connection = null;                   //连接
        Session session;                        //会话,发送接收消息的线程
        Destination destination;                //消息的目的地
        MessageProducer messageProducer;        //消息发送者/生产者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.username, JMSProducer.password, JMSProducer.brokerURL);
        //获取连接
        try {
            connection = connectionFactory.createConnection();
            connection.start();                 //启动连接
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //开启事务,自动确认
            //创建消息队列
            Queue queue = session.createQueue("firstQueue");
            destination = queue;
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);
            session.commit();                   //事务提交
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                connection.close();
            }
        }
    }
    /**
     * 发送消息
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < JMSProducer.sendNum; i++) {
            TextMessage textMessage = session.createTextMessage("ActiveMQ发送的消息" + i);
            System.out.println("发送消息:" + "ActiveMQ发送的消息" + i);
            messageProducer.send(textMessage);
        }
    }
}

该生产者每次运行向队列中添加十条文本信息供消费

建立消息的消费者

    private static final String username = ActiveMQConnection.DEFAULT_USER;
    private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String brokerURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory;    //连接工厂
        Connection connection = null;           //连接
        Session session;                        //会话,发送接收消息的线程
        Destination destination;                //消息的目的地
        MessageConsumer messageConsumer;        //消息消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.username, JMSConsumer.password, JMSConsumer.brokerURL);
        //获取连接
        connection = connectionFactory.createConnection();
        connection.start(); //启动连接
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);    //不加事务
        destination = session.createQueue("firstQueue");        //创建连接的消息队列
        //创建消息消费者
        messageConsumer = session.createConsumer(destination);
        /**
         * 方式1轮询
         */
        while(true) {

            TextMessage textMessage = (TextMessage)messageConsumer.receive(100000); //100s一次
            if (textMessage != null) {
                System.out.println("收到的消息:" + textMessage.getText());
            } else {
                break;
            }
        }
    }

可以看出,消费者的建立和生产者基本类似,但是也有区别,在消费者中的session 是不加事务的,而生产者中的session是有事务的,在发送消息之后需要通过commit进行提交。

消费者通过轮询的方式进行消息的接收,只要队列非空就从其中拿消息进行显示。

消费者第二种方式,通过监听方式进行消息的收取

轮询的方式效率不高,需要时刻保持连接,实际生产中多使用监听方式对消息进行收取。

需要先定义一个消息监听器,只需要实现MessageListener接口,重写其中的onMessage()回调方法。

public class MListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("接收到的消息" + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

在消费者中作如下改写,删掉轮询的部分,改为注册监听器。

    //实例化连接工厂
    connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.username, JMSConsumer2.password, JMSConsumer2.brokerURL);
    //获取连接
    connection = connectionFactory.createConnection();
    connection.start(); //启动连接
    session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);    //不加事务
    destination = session.createQueue("firstQueue");        //创建连接的消息队列
    //创建消息消费者
    messageConsumer = session.createConsumer(destination);
    //注册监听器方式二:监听器
    messageConsumer.setMessageListener(new MListener());

这样,只要监听到改变就对消息进行处理,减少了连接的消耗,提高了效率,通过回调的方式异步的处理队列中的消息。