上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送。本文中,我们就一起来集中分析一下批量消息的发送是怎样的逻辑。

DefaultProducer.send

RocketMQ提供了批量发送消息的API,同样在DefaultProducer.java中

@Override
public SendResult send(
    Collection<Message> msgs) throws MQClientException, RemotingException, 
        MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}

它的参数为Message集合,也就是一批消息。它的另外一个重载方法提供了发送超时时间参数

@Override
public SendResult send(Collection<Message> msgs,
    long timeout) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs), timeout);
}

可以看到是将消息通过batch()方法打包为单条消息,我们看一下batch方法的逻辑

Read More

上一篇文章中,我们对事务提交部分的框架逻辑及代码实现做了较为详细的讲解;本文中,我们接着分析一下事务回滚阶段的机理及代码实现逻辑。

shieldTXC.PNG

这里主要看图的下半部分。

当事务下游应用达到最大消费次数,事务回滚被消息持久化之后,ShieldTXC的消息发送线程sendTxcMessageScheduler会扫描到待发送的回滚消息并投递到 [事务回滚队列]

第一阶段:实现回滚逻辑

事务上游应用在启动过程中初始化了ShieldTxcConsumerListenerAdapter消费适配器,并通过ShieldTxcRollbackListener实现了回滚逻辑。

@Service
public class TxConsumeService implements InitializingBean {

    @Value("${shield.event.rocketmq.nameSrvAddr}")
    String nameSerAddr;
    @Value("${shield.event.rocketmq.topicSource}")
    String topic;

    @Override
    public void afterPropertiesSet() throws Exception {
        new ShieldTxcConsumerListenerAdapter(nameSerAddr, topic, new ShieldTxcRollbackListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("测试消费【回滚】ShieldTxcRollbackListener");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }));
    }
}

这段代码在之前的事务提交阶段已经讲解过,事务发起端需要自行实现回滚逻辑,这样才能在异常发生时与事务下游保持数据一致性。更多的细节此处就不再赘述。

Read More

从本文开始,我们正式进入自己写分布式事务框架的部分。

笔者将这个本地消息表分布式事务框架命名为 shieldTXC,意思是 神盾事务框架,框架内核及demo案例的代码已经打包上传至github上,

地址为:shieldTXC源码地址。如果觉得这个喜欢可以点个star支持下。

题外话不多说,接下来我们一边看框架的原理图,在宏观上对框架的机理做一个了解,一边对相应的原理进行代码实现讲解,这样理论与实战相结合,相信会加深读者朋友的理解。

框架核心原理

首先看下框架的核心原理图。

shieldTXC.PNG

看起来还是比较整洁的,这也是笔者写代码的一个宗旨:

好的框架可以复杂,但架构一定是优雅的、清晰的。

Read More

本文我将带领读者朋友对RocketMQ生产者如何发送消息这一流程进行源码层面的解析。内容偏干,请自备白开水。

生产者初始化

进行消息发送的前提是先对生产者进行初始化,一段较为常规的生产者初始化示例代码如下

@Value("${rocketmq.nameServer}")
String nameSrvAddr;

@PostConstruct
public void init() {

    DefaultMQProducer defaultMQProducer =
            new DefaultMQProducer("PRODUCER_GROUP", true);
    defaultMQProducer.setNamesrvAddr(nameSrvAddr);
    // 发送失败重试次数
    defaultMQProducer.setRetryTimesWhenSendFailed(3);
    try {
        defaultMQProducer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("Producer加载异常!", e);
    }
}

Read More

本文是对RocketMQ事务消息的综合讲解,提供一个较为典型的落地案例供读者进行参考。

什么是RocketMQ事务消息

以下内容引用自RocketMQ开发者中心,http://rocketmq.cloud

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

事务消息-0

事务消息流程

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

  1. 发送事务半消息(half消息)。
  2. 服务端响应消息写入结果。
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
  4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2.补偿流程:

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态
  3. 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

Read More

本文是 “自己写分布式事务框架” 系列的首篇,我将从原理出发,提纲挈领地对框架的核心原理做一个概述,对后续实现细节分析的展开奠定基础。

分布式事务在当下分布式开发领域已经成为必须考虑的因素,随着微服务、SOA架构思想日益被开发者所熟知,分布式事务的解决思路也不断被提出并被开源社区实现。

在互联网开发中,我们常常会采用一揽子“柔性事务”解决方案来保证系统内模块之间数据的 最终一致性

业界知名的分布式事务解决方案有如下几种:

  1. TCC方案,它的开源实现有TCC-Transaction、ByteTCC,阿里开源的SEATA框架也加入了对TCC模式的支持;
  2. 可靠消息最终一致方案,代表的实现方式为RocketMQ事务消息;
  3. SAGA事务,代表实现方式为华为开源的serviceComb;
  4. 最大努力通知型解决方案,该方案与业务耦合较为严重,因此业界也没有一个较为抽象的开源实现;
  5. 消息溯源方案(或称为本地消息表方案),该方案实现较为简单,但也与业务耦合较为严重,据我调查暂时没有抽象的开源实现。

上述方案中,我挑选了第五种 消息溯源方案(即本地消息表方案,后文均称为本地消息表方案) 作为自己写分布式事务框架的核心机制,旨在实现一个与业务无关的、基于消息的、异步确保的、最终一致的柔性事务框架。

框架的实现主要基于RocketMQ的普通消息,我们都知道RocketMQ已经支持了事务消息,这里只是基于它对本地消息表方案进行实现,原则上,本地消息表方案支持任意具有发布订阅能力的消息中间件。

上述其他实现在笔者之前的文章中有过较为系统的讲解,感兴趣的同学可以移步 置顶贴-我说分布式事务系列 进行了解学习,本系列不再展开说明。

Read More

之前笔者已经写过关于分布式锁的内容,但囿于彼时对于分布式锁的研究还不算太深入,如今读来发现还是存在一些问题,故而写作本文,对Redis分布式锁的实现做一个更加全面、进阶的阐述和总结,帮助读者对Redis分布式锁有一个更加深入客观的了解。关于更多分布式锁的其他实现,在后续的文章中也会陆续展开。

我们还是通过经典的WWH(what why how)三段论方式进行行文。首先再次从宏观上了解什么是分布式锁以及分布式锁的约束条件和常见实现方式。

分布式锁

这部分主要对分布式锁再次做一次较为完整的回顾与总结。

什么是分布式锁

引用度娘的词条,对于分布式锁的解释如下:

什么是分布式锁-百度百科

这段话概括的还是不错的,根据概述以及对单机锁的了解,我们能够提炼并类比得出分布式锁的几个主要约束条件:

Read More

RocketMQ在4.4.0 Release版本中支持了消息轨迹特性。

这一特性的增加能够让我们对消息从生产到存储到消费这一整个流程有一个清晰的掌握,配合上console1.0.1以上版本的图形化界面,对于错误排查及日常运维是一个很有用处的feature。

话不多说,我们先通过实战对消息轨迹特性做一次直观的了解,再从源码角度深入分析一下RocketMQ是如何实现消息轨迹这一能力的。

消息轨迹实战

消息轨迹特性在4.4.0时已经支持,本次实战使用最新的4.5.1Release,读者可以自行选择喜欢的版本进行测试。

关于如何搭建并启动RocketMQ集群在本文中不作为重点讲解,感兴趣的同学可以移步 跟我学RocketMQ[1-1]之安装RocketMQ

消息轨迹–broker

RocketMQ对于消息轨迹特性是支持可配置的,如果我们要开启消息轨迹能力,需要编辑broker的配置文件broker.conf,在其中添加如下配置:

traceTopicEnable=true

我们也能够指定自定义的traceTopic名称,具体配置项如下:

msgTraceTopicName=SNOWALKER_TRACE_TOPIC

启动broker会调用相关逻辑自动创建轨迹Topic,如果msgTraceTopicName没有配置则会使用默认值:RMQ_SYS_TRACE_TOPIC

通过Topic实现各种特性是RocketMQ设计精妙之处,定时消息、事务消息、消息重试,包括我们今天接触到的消息轨迹都是这种思想的体现。至于它们具体是如何实现的,我们在文章的后半段的源码分析部分详细展开。

Read More

说罢秒杀网关相关的核心要点,我们接着聊聊秒杀收单相关的核心要点与代码实现。

本文重点说明以下几点:

  1. 业务场景概述
  2. 通过消息队列异步收单
  3. 实际库存扣减
  4. 实际下单操作

业务场景概述

首先对业务场景进行概述。

完整的业务流可参考 实战分布式之电商高并发秒杀场景总览

秒杀收单核心业务逻辑如下:

  1. 秒杀下单消费者从MQ中获取到下单消息,开始下单操作
  2. 首先进行下单前的消息幂等校验,对于已经存在的下单消息不予消费
  3. 接着进行真实的库存判断,如果库存不够扣减则不再消费,这里应当通过消息推送告知用户商品已售罄,提示用户下次再来
  4. 如果库存足够,则扣减库存并下单。这两者在同一个本地事务域中,保证扣减完库存一定能够下单成功
  5. 下单成功后,通过消息推送通知用户对秒杀订单进行付款,付款后进行后续的发货等操作

Read More

Fork me on GitHub