文章目录
  1. 1. 场景抽象
    1. 1.1. 消息发送端
    2. 1.2. 阻塞队列
    3. 1.3. 消息消费端
    4. 1.4. 异常处理
  2. 2. 代码讲解
    1. 2.1. 核心队列逻辑MailSendQueue.java
    2. 2.2. 生产者逻辑
    3. 2.3. 消费者逻辑
    4. 2.4. 调用逻辑
    5. 2.5. MailMessageVO
  3. 3. 小结

在之前的一篇文章 手写JDK组件之阻塞队列BlockedQueue 中 ,我们模仿Java的ArrayBlockingQueue实现了一个阻塞队列。并通过该案例对阻塞队列的实现机制有了一个初步的认识。

实际上,Java中的阻塞队列用处还是比较广泛的,尤其是当我们不需要使用复杂的分布式消息队列,只是想要基于生产者-消费者模型,解耦业务逻辑,那么我们就可以借助内存队列实现。

这类型业务场景往往具备以下特点:

  • 消息发送量不多
  • 消息的安全性不高,可以容忍丢失
  • 不需要保证HA
  • 不需要提供完备的failover机制

举个例子,比如说当订单下单成功后我们想发送一个站内信,通知商户或者用户下单成功,仅仅作为一个提醒。类似这种场景,我们就可以借助内存队列实现。

基于我们刚提出的这个场景,编写一个demo进行验证。

场景抽象

我们为该场景划分几个部分

消息发送端

封装了消息发送、消息合法性校验、消息体转化、内容序列化以及入队逻辑,提供友好的api被业务代码所使用。

阻塞队列

基于Java的BlockQueue实现,在外层进行一定封装。

消息消费端

负责读取、并对对列中消息进行消费,当没有消息时进行阻塞等待,遇到异常时会交给异常处理机制进行处理。

异常处理

负责在消费消息过程中,出现异常时的后续处理。

常见的处理方式如:

  • 对消息经过处理后重新丢回队列
  • 对消息进行持久化,后续进行重发等处理方式。

为了提升消息的处理效率,对消费任务通常会通过一个线程去监听队列并阻塞等待。并且这个线程一般都是随应用启动而启动,即:消费端的线程是随着应用初始化而创建,并且常驻内存。

代码讲解

简单了解了原理和场景,我们直接看代码实现。

核心队列逻辑MailSendQueue.java

public class MailSendQueue {

    private Logger logger = LoggerFactory.getLogger(MailSendQueue.class);

    /**队列大小*/
    public static final int QUEUE_MAX_SIZE = 100;

    private static MailSendQueue mailSendQueue = new MailSendQueue();

    /**阻塞队列*/
    private BlockingQueue<MailMessageVO> blockingQueue = new LinkedBlockingQueue<MailMessageVO>(QUEUE_MAX_SIZE);

    public static MailSendQueue getInstance() {
        return mailSendQueue;
    }

    /**
    * 消息入队
    *
    * @param alarmMessageVO
    * @return
    */
    public boolean push(MailMessageVO alarmMessageVO) {
        return this.blockingQueue.offer(alarmMessageVO);
    }

    /**
    * 消息出队
    *
    * @return
    */
    public MailMessageVO poll() {
        MailMessageVO result = null;
        try {
            result = this.blockingQueue.take();
        } catch (InterruptedException e) {
            logger.error("message poll error.", e);
        }
        return result;
    }

    /**
    * 获取队列大小
    *
    * @return
    */
    public int size() {
        return this.blockingQueue.size();
    }
}

类MailSendQueue是我们的邮件发送阻塞队列核心逻辑,它的核心是LinkedBlockingQueue,接受的元素为业务定义的邮件发送对象MailMessageVO。

MailSendQueue提供方法 push(MailMessageVO alarmMessageVO) 供消息发送,提供方法 poll() 供消息消费。

其中push方法核心为调用BlockingQueue的offer方法,

offer方法会将指定元素插入此队列中(如果立即可行且不会违反容量限制),当插入成功时返回 true,如果当前没有可用的空间,则返回 false,不会抛异常:

出队方法poll()的核心逻辑为调用blockingQueue的take()方法,

take方法会从队列头部获取元素,获取后此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞

生产者逻辑

public class MessageProducer {
    public void sendMessage(MailMessageVO mailMessageVO) {
        MailSendQueue.getInstance().push(mailMessageVO);
    }
}

生产者逻辑很简洁,就是获取MailSendQueue实例,将消息对象入队。

消费者逻辑

public class MessageConsumer implements Runnable {

    private static final Logger LOGGER = Logger.getLogger("MessageConsumer");

    private Thread thread;

    public void start() {
        Thread thread = new Thread(this);
        thread.start();
    }

    @Override
    public void run() {
        while (true) {
            try {
                MailMessageVO mailMessageVO = MailSendQueue.getInstance().poll();
                consume(mailMessageVO);
            } catch (Exception e) {
                LOGGER.warning("Poll AlarmMessageVO from AlarmMessageQueue error or send alarm mail error.");
            }
        }
    }

    private void consume(MailMessageVO mailMessageVO) {
        Thread.currentThread().setName("MessageConsumer-thread");
        System.out.println(Thread.currentThread().getName() + "-消费消息: " + JSON.toJSONString(mailMessageVO));
    }
}

消费者逻辑通过线程触发,通过while,无限循环阻塞等待及消费消息。

此处的业务场景不需要对消费异常的消息进行重试,但在实际工作中,需要根据具体的业务场景去决定是否需要在catch里面进行异常处理流程。

根据经验,实际开发中,我们尽量考虑异常的重试机制,尤其是异步的消息处理场景,尽量对异常流程增加重试操作。比如,常见的措施就是对异常消息进行持久化操作。

调用逻辑

接着看一下如何进行调用。

public class Client {

    public static void main(String[] args) throws InterruptedException {
        MessageProducer producer = new MessageProducer();
        MessageConsumer consumer = new MessageConsumer();
        consumer.start();

        for (int i = 0; i < 100; i++) {
            MailMessageVO message = new MailMessageVO();
            message.setId(i).setContent("消息发送,第" + i + "条");
            producer.sendMessage(message);
            Thread.sleep(1000);
        }

    }
}

我们的场景是,发送100条消息,观察消费者的消费情况,日志输出如下:

MessageConsumer-thread-消费消息: {"content":"消息发送,第0条","id":0}
MessageConsumer-thread-消费消息: {"content":"消息发送,第1条","id":1}
MessageConsumer-thread-消费消息: {"content":"消息发送,第2条","id":2}
MessageConsumer-thread-消费消息: {"content":"消息发送,第3条","id":3}
MessageConsumer-thread-消费消息: {"content":"消息发送,第4条","id":4}
......

可以看到输出符合预期。

此处要注意的是,之所以在消费者内部通过异步线程进行消费处理,主要原因在于消费端是阻塞的,如果在主线程中直接执行,效率较低。

MailMessageVO

最后贴一下消息体的代码,用于备份。

public class MailMessageVO {

    private int id;
    private String content;
    ...省略get set...

小结

本文我们通过一个案例,基于Java的阻塞队列实现了一个异步的生产-消费模型,提供了一种简单的内存队列的实现方式。

在后续的文章中,我将继续带领读者完善代码,对内存队列使用中的异常场景进行补充讲解,从而掌握到更加贴近生产的内存队列使用经验。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 场景抽象
    1. 1.1. 消息发送端
    2. 1.2. 阻塞队列
    3. 1.3. 消息消费端
    4. 1.4. 异常处理
  2. 2. 代码讲解
    1. 2.1. 核心队列逻辑MailSendQueue.java
    2. 2.2. 生产者逻辑
    3. 2.3. 消费者逻辑
    4. 2.4. 调用逻辑
    5. 2.5. MailMessageVO
  3. 3. 小结
Fork me on GitHub