本文为ActiveMQ学习笔记的第二篇,主要介绍发布订阅方式进行消息的生产和消费。
首先介绍一下常用的对象,在ActiveMQ中,最为主要的两个对象分别为MessageConsumer(消息消费者),MessageProducer(消息生产者)。
在发布消息或者接收之前需要创建一些基本的对象:
private static final String username = ActiveMQConnection.DEFAULT_USER;
private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String brokerURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final int sendNum = 10; //发送的消息数量
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory; //连接工厂
Connection connection = null; //连接
Session session; //会话,发送接收消息的线程
Destination destination; //消息的目的地
MessageProducer messageProducer; //消息发送者/生产者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(Producer.username, Producer.password, Producer.brokerURL);
//获取连接
try {
connection = connectionFactory.createConnection();
connection.start(); //启动连接
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //开启事务,自动确认
//创建主题
destination = session.createTopic("topic1");
//创建消息生产者
messageProducer = session.createProducer(destination);
//发送消息
sendMessage(session, messageProducer);
session.commit(); //事务提交
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
connection.close();
}
}
}
/**
* 发送消息
* @param session
* @param messageProducer
* @throws Exception
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < Producer.sendNum; i++) {
TextMessage textMessage = session.createTextMessage("ActiveMQ发送的消息" + i);
System.out.println("发布消息:" + "ActiveMQ发送的消息" + i);
messageProducer.send(textMessage);
}
}
该生产者每次运行向队列中添加十条文本信息供消费
private static final String username = ActiveMQConnection.DEFAULT_USER;
private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String brokerURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory; //连接工厂
Connection connection = null; //连接
Session session; //会话,发送接收消息的线程
Destination destination; //消息的目的地
MessageConsumer messageConsumer; //消息消费者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(Consumer1.username, Consumer1.password, Consumer1.brokerURL);
//获取连接
connection = connectionFactory.createConnection();
connection.start(); //启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //不加事务
destination = session.createTopic("topic1"); //创建连接的消息队列
//创建消息消费者
messageConsumer = session.createConsumer(destination);
//注册监听器方式二:监听器
messageConsumer.setMessageListener(new MListener1());
}
需要先定义一个消息监听器,只需要实现MessageListener接口,重写其中的onMessage()回调方法。
public class MListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("订阅者1接收到的消息" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
和点对点的不同之处主要就是创建消息目的地的方式不同了,这里是createTopic方式。然后在消费者中注册消息监听器。一旦队列中有消息就调用回调方法进行消息的消费。
本文仅仅是对ActiveMQ的基本使用的介绍,它在实际生产中运用广泛,尤其是在异步消息处理和订单管理及服务间通讯运用很广泛,更多的运用还应当在实践中自行体会。