文章目录
  1. 1. RocketMQ重试时间窗
  2. 2. 死信的业务处理方式
  3. 3. 发送失败如何重试
  • 小结
  • 本文中,我将讲解RocketMQ使用过程中,如何进行消息重试。

    首先,我们需要明确,只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重试的。

    集群消费模式下,当消息消费失败,RocketMQ会通过消息重试机制重新投递消息,努力使该消息消费成功。

    当消费者消费该重试消息后,需要返回结果给broker,告知broker消费成功(ConsumeConcurrentlyStatus.CONSUME_SUCCESS)或者需要重新消费(ConsumeConcurrentlyStatus.RECONSUME_LATER)。

    这里有个问题,如果消费者业务本身故障导致某条消息一直无法消费成功,难道要一直重试下去吗?

    答案是显而易见的,并不会一直重试。

    事实上,对于一直无法消费成功的消息,RocketMQ会在达到最大重试次数之后,将该消息投递至死信队列。然后我们需要关注死信队列,并对该死信消息业务做人工的补偿操作。

    那如何返回消息消费失败呢?

    RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

    1. 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
    2. 业务消费方返回null
    3. 业务消费方主动/被动抛出异常

    前两种情况较容易理解,当返回ConsumeConcurrentlyStatus.RECONSUME_LATER或者null时,broker会知道消费失败,后续就会发起消息重试,重新投递该消息。

    注意 对于抛出异常的情况,只要我们在业务逻辑中显式抛出异常或者非显式抛出异常,broker也会重新投递消息,如果业务对异常做了捕获,那么该消息将不会发起重试。因此对于需要重试的业务,消费方在捕获异常的时候要注意返回ConsumeConcurrentlyStatus.RECONSUME_LATER或null并输出异常日志,打印当前重试次数。(推荐返回ConsumeConcurrentlyStatus.RECONSUME_LATER

    RocketMQ重试时间窗

    这里介绍一下Apache RocketMQ的重试时间窗,当消息需要重试时,会按照该规则进行重试。

    我们可以在RocketMQ的broker.conf配置文件中配置Consumer侧重试次数及时间间隔, 配置如下

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    

    次数与重试时间间隔对应关系表如下:

    重试次数 距离第一次发送的时间间隔
    1 1s
    2 5s
    3 10s
    4 30s
    5 1m
    6 2m
    7 3m
    8 4m
    9 5m
    10 6m
    11 7m
    12 8m
    13 9m
    14 10m
    15 20m
    16 30m
    17 1h
    18 2h

    可以看到,RocketMQ采用了“时间衰减策略”进行消息的重复投递,即重试次数越多,消息消费成功的可能性越小。

    默认的处理机制中,如果我们只对消息做重复消费,达到最大重试次数之后消息就进入死信队列了。

    死信的业务处理方式

    我们也可以根据业务的需要,定义消费的最大重试次数,每次消费的时候判断当前消费次数是否等于最大重试次数的阈值。

    如:重试三次就认为当前业务存在异常,继续重试下去也没有意义了,那么我们就可以将当前的这条消息进行提交,返回broker状态ConsumeConcurrentlyStatus.CONSUME_SUCCES,让消息不再重发,同时将该消息存入我们业务自定义的死信消息表,将业务参数入库,相关的运营通过查询死信表来进行对应的业务补偿操作。

    发送失败如何重试

    上文中,我们讲解了对于消费失败的重试策略,这个章节中我们来了解下消息发送失败如何进行重试。

    当发生网络抖动等异常情况,Producer生产者侧往broker发送消息失败,即:生产者侧没收到broker返回的ACK,导致Consumer无法进行消息消费,这时RocketMQ会进行发送重试。

    使用DefaultMQProducer进行普通消息发送时,我们可以设置消息发送失败后最大重试次数,并且能够灵活的配合超时时间进行业务重试逻辑的开发,使用的API如下:

    /**设置消息发送失败时最大重试次数*/
    public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
        this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
    }
    
    /**同步发送消息,并指定超时时间*/
    public SendResult send(Message msg,
                        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, timeout);
    }
    

    通过API可以看出,生产者侧的重试是比较简单的,例如:设置生产者在3s内没有发送成功则重试3次的代码如下:

    /**同步发送消息,如果3秒内没有发送成功,则重试3次*/
    DefaultMQProducer producer = new DefaultMQProducer("DefaultProducerGroup");
    producer.setRetryTimesWhenSendFailed(3);
    producer.send(msg, 3000L);
    

    小结

    本文中,我们主要介绍了RocketMQ的消息重试机制,该机制能够最大限度的保证业务能够往我们期望的方向流转。

    这里还需要注意,业务重试的时候我们的消息消费端需要保证消费的 幂等性, 关于消息消费的幂等如何处理,我们在后续的文章会展开讲解。

    文章目录
    1. 1. RocketMQ重试时间窗
    2. 2. 死信的业务处理方式
    3. 3. 发送失败如何重试
  • 小结
  • Fork me on GitHub