Java内存阻塞队列实例
在之前的一篇文章 手写JDK组件之阻塞队列BlockedQueue 中 ,我们模仿Java的ArrayBlockingQueue实现了一个阻塞队列。并通过该案例对阻塞队列的实现机制有了一个初步的认识。
实际上,Java中的阻塞队列用处还是比较广泛的,尤其是当我们不需要使用复杂的分布式消息队列,只是想要基于生产者-消费者模型,解耦业务逻辑,那么我们就可以借助内存队列实现。
这类型业务场景往往具备以下特点:
- 消息发送量不多
- 消息的安全性不高,可以容忍丢失
- 不需要保证HA
- 不需要提供完备的failover机制
举个例子,比如说当订单下单成功后我们想发送一个站内信,通知商户或者用户下单成功,仅仅作为一个提醒。类似这种场景,我们就可以借助内存队列实现。
基于我们刚提出的这个场景,编写一个demo进行验证。
场景抽象
我们为该场景划分几个部分
消息发送端
封装了消息发送、消息合法性校验、消息体转化、内容序列化以及入队逻辑,提供友好的api被业务代码所使用。
阻塞队列
基于Java的BlockQueue实现,在外层进行一定封装。
消息消费端
负责读取、并对对列中消息进行消费,当没有消息时进行阻塞等待,遇到异常时会交给异常处理机制进行处理。
异常处理
负责在消费消息过程中,出现异常时的后续处理。
常见的处理方式如:
- 对消息经过处理后重新丢回队列
- 对消息进行持久化,后续进行重发等处理方式。
为了提升消息的处理效率,对消费任务通常会通过一个线程去监听队列并阻塞等待。并且这个线程一般都是随应用启动而启动,即:消费端的线程是随着应用初始化而创建,并且常驻内存。
代码讲解
简单了解了原理和场景,我们直接看代码实现。
核心队列逻辑MailSendQueue.java
public class MailSendQueue {
private Logger logger = LoggerFactory.getLogger(MailSendQueue.class);
/**队列大小*/
public static final int QUEUE_MAX_SIZE = 100;
private static MailSendQueue mailSendQueue = new MailSendQueue();
/**阻塞队列*/
private BlockingQueue<MailMessageVO> blockingQueue = new LinkedBlockingQueue<MailMessageVO>(QUEUE_MAX_SIZE);
public static MailSendQueue getInstance() {
return mailSendQueue;
}
/**
* 消息入队
*
* @param alarmMessageVO
* @return
*/
public boolean push(MailMessageVO alarmMessageVO) {
return this.blockingQueue.offer(alarmMessageVO);
}
/**
* 消息出队
*
* @return
*/
public MailMessageVO poll() {
MailMessageVO result = null;
try {
result = this.blockingQueue.take();
} catch (InterruptedException e) {
logger.error("message poll error.", e);
}
return result;
}
/**
* 获取队列大小
*
* @return
*/
public int size() {
return this.blockingQueue.size();
}
}
类MailSendQueue是我们的邮件发送阻塞队列核心逻辑,它的核心是LinkedBlockingQueue,接受的元素为业务定义的邮件发送对象MailMessageVO。
MailSendQueue提供方法 push(MailMessageVO alarmMessageVO) 供消息发送,提供方法 poll() 供消息消费。
其中push方法核心为调用BlockingQueue的offer方法,
offer方法会将指定元素插入此队列中(如果立即可行且不会违反容量限制),当插入成功时返回 true,如果当前没有可用的空间,则返回 false,不会抛异常:
出队方法poll()的核心逻辑为调用blockingQueue的take()方法,
take方法会从队列头部获取元素,获取后此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞
生产者逻辑
public class MessageProducer {
public void sendMessage(MailMessageVO mailMessageVO) {
MailSendQueue.getInstance().push(mailMessageVO);
}
}
生产者逻辑很简洁,就是获取MailSendQueue实例,将消息对象入队。
消费者逻辑
public class MessageConsumer implements Runnable {
private static final Logger LOGGER = Logger.getLogger("MessageConsumer");
private Thread thread;
public void start() {
Thread thread = new Thread(this);
thread.start();
}
@Override
public void run() {
while (true) {
try {
MailMessageVO mailMessageVO = MailSendQueue.getInstance().poll();
consume(mailMessageVO);
} catch (Exception e) {
LOGGER.warning("Poll AlarmMessageVO from AlarmMessageQueue error or send alarm mail error.");
}
}
}
private void consume(MailMessageVO mailMessageVO) {
Thread.currentThread().setName("MessageConsumer-thread");
System.out.println(Thread.currentThread().getName() + "-消费消息: " + JSON.toJSONString(mailMessageVO));
}
}
消费者逻辑通过线程触发,通过while,无限循环阻塞等待及消费消息。
此处的业务场景不需要对消费异常的消息进行重试,但在实际工作中,需要根据具体的业务场景去决定是否需要在catch里面进行异常处理流程。
根据经验,实际开发中,我们尽量考虑异常的重试机制,尤其是异步的消息处理场景,尽量对异常流程增加重试操作。比如,常见的措施就是对异常消息进行持久化操作。
调用逻辑
接着看一下如何进行调用。
public class Client {
public static void main(String[] args) throws InterruptedException {
MessageProducer producer = new MessageProducer();
MessageConsumer consumer = new MessageConsumer();
consumer.start();
for (int i = 0; i < 100; i++) {
MailMessageVO message = new MailMessageVO();
message.setId(i).setContent("消息发送,第" + i + "条");
producer.sendMessage(message);
Thread.sleep(1000);
}
}
}
我们的场景是,发送100条消息,观察消费者的消费情况,日志输出如下:
MessageConsumer-thread-消费消息: {"content":"消息发送,第0条","id":0}
MessageConsumer-thread-消费消息: {"content":"消息发送,第1条","id":1}
MessageConsumer-thread-消费消息: {"content":"消息发送,第2条","id":2}
MessageConsumer-thread-消费消息: {"content":"消息发送,第3条","id":3}
MessageConsumer-thread-消费消息: {"content":"消息发送,第4条","id":4}
......
可以看到输出符合预期。
此处要注意的是,之所以在消费者内部通过异步线程进行消费处理,主要原因在于消费端是阻塞的,如果在主线程中直接执行,效率较低。
MailMessageVO
最后贴一下消息体的代码,用于备份。
public class MailMessageVO {
private int id;
private String content;
...省略get set...
小结
本文我们通过一个案例,基于Java的阻塞队列实现了一个异步的生产-消费模型,提供了一种简单的内存队列的实现方式。
在后续的文章中,我将继续带领读者完善代码,对内存队列使用中的异常场景进行补充讲解,从而掌握到更加贴近生产的内存队列使用经验。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。