本文为分布式系列文章的集锦汇总,长期保持置顶及更新,读者可以在本文中更好的学习到某个具体的系列。

注: 转载本博客文章请注明出处,原创不易,洗文可耻。

我说分布式事务系列

文章链接
我说分布式事务之TCC
我说分布式事务之最大努力通知型事务
我说分布式事务之可靠消息最终一致性事务1-原理及实现
我说分布式事务之消息一致性事务2-rocketmq的实现
【汇总】我说分布式事务系列
分布式事务之聊聊TCC
分布式事务最终一致性常用方案

跟我学RocketMQ

文章链接
[1-1]安装RocketMQ
[1-2]安装RocketMQ-Console管理平台
[1-3]发送普通消息及封装DefaultMQProducer支持spring
[1-4]消费消息及封装DefaultMQPushConsumer支持spring
[1-5]发送事务消息实现分布式事务及封装TransactionMQProducer支持spring
[2-0]跟我学RocketMQ之消息重试
[2-1]跟我学RocketMQ之消息幂等
[2-2]跟我学RocketMQ之消息轨迹实战与源码分析
[2-3]跟我学RocketMQ之消息发送源码解析
[2-4]跟我学RocketMQ之批量消息发送源码解析
[2-5]跟我学RocketMQ之消息消费源码解析-p1
[2-6]跟我学RocketMQ之消息消费源码解析-p2
[2-7]跟我学RocketMQ之订阅关系一致性源码讨论
[2-8]跟我学RocketMQ之消息拉取源码解析

分库分表

文章链接
我说分布式之分库分表
跟我学shardingjdbc之shardingjdbc入门
跟我学shardingjdbc之使用jasypt加密数据库连接密码
跟我学shardingjdbc之分布式主键及其自定义
跟我学shardingjdbc之自定义分库分表策略-复合分片算法自定义实现

Read More

我们继续对消息消费流程的源码进行解析。

本文主要针对push模式下的消息拉取流程进行解析。我们重点分析集群消费模式,对于广播模式其实很好理解,每个消费者都需要拉取主题下面的所有消费队列的消息。

在集群消费模式下,同一个消费者组内包含了多个消费者实例,同一个topic下存在多个消费队列。对于单个消费者组,其内部维护了一个线程池进行消息消费,这部分内容可以移步 跟我学RocketMQ之消息消费源码解析(2)

之前我们已经研究了消费者的初始化流程,在启动MQClientInstance过程中,启动了一个消息拉取线程PullMessageService进行消息拉取工作。

Read More

RocketMQ消费者在进行消费时,需要遵循 “订阅关系一致” 原则,关于订阅关系一致,我引用阿里云RocketMQ页面的解释,如下图:

rmq0.png

从图中可以提炼出关键词,即:

同一个消费者组订阅的topic、topic中的tag必须保持一致,否则会出现消费不到消息的情况。

举个例子:比如我们有个topic名为DEMO_TOPIC,它有两个tag,分别为tagA、tagB。用一个消费者组demo_group分别订阅tagA、tagB,这时就会出现某个tag对应的消费者消费不到消息的情况。

解决方法就是:针对不同的tag使用不同的消费者组,在上面的案例中的解决方法为:使用demo_group_A 订阅tagA,使用demo_group_B订阅tagB。

提供了解决方案,还是有些意犹未尽,那么我们就深入RocketMQ的源码,感受一下订阅关系一致的机理。

Read More

研究了一段时间框架,有点审美疲劳,今天讲点轻松的,手写一个阻塞队列,实践一把lock+condition。

“等待通知”机制

首先复习一下经典的 “等待通知”机制。

线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁 –《极客时间-Java并发编程实战》

在Java中实现 “等待通知” 机制一般有两种方式,synchronized/Lock+Condition。

Read More

本文我们就Netty使用中常见的半包/粘包问题进行分析和处理。

解决粘包/半包问题本质方式为:有明确的业务应用数据区分标志,能够按照边界完整的接受Netty传输的数据。

字符串类型消息解决粘包/半包方式汇总

对于字符串类型的消息,Netty提供了多种现成的编解码工具解决粘包/半包问题,具体的工具类组合如下:

  • DelimiterBasedFrameDecoder+StringDecoder,通过特殊分隔符作为消息的结束标志
  • LineBasedFrameDecoder+StringDecoder,通过换行符作为消息的结束标志
  • FixedLengthFrameDecoder+StringDecoder,按照固定长度方式获取消息并解析

本文我们先介绍第一种方式,即:

DelimiterBasedFrameDecoder+StringDecoder

DelimiterBasedFrameDecoder的原理很好理解:通过利用特殊字符作为数据包的结束标志。发送方与接收方通过该标记对数据包进行分割解析即可。

Read More

本文我们接着分析RocketMQ消息消费的逻辑。

接上文,DefaultMQPushConsumerImpl启动过程中,启动了consumeMessageService消息消费线程。

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();

可以看到,是根据MessageListener的具体实现选择具体的consumeMessageService实现,我们重点讲解并行消费服务ConsumeMessageConcurrentlyService。

Read More

本文我们接着分析一下RocektMQ实现消息消费的源码细节,这部分的内容较多,因此拆分为几个章节分别进行讲解。

本章节重点讲解DefaultMQPushConsumer的代码逻辑。

DefaultMQPushConsumer使用样例

按照惯例还是先看一下DefaultMQPushConsumer的使用样例。

@PostConstruct
public void init() {
    defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
    defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
    // 从头开始消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式:集群模式
    defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册监听器
    defaultMQPushConsumer.registerMessageListener(messageListener);
    // 订阅所有消息
    try {
        defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
        defaultMQPushConsumer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e);
    }
    LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!");
}

初始化过程中需要调用registerMessageListener将具体的消费实现Listener注入。

@Component(value = "notifySendListenerImpl")
public class NotifySendListenerImpl implements MessageListenerConcurrently {

Read More

从零学Netty系列旨在记录笔者学习Netty的过程,从入门到熟练,尽量全面的对Netty做一次深入的探秘。

本文是 “从零学Netty” 系列的第一篇,我将介绍Netty的基本概念,线程模型,以及入门案例。

Netty基本概念

Netty是一个高性能网络通信框架,它提供了异步的、事件驱动的能力,能够帮助开发者快速开发高性能、高可靠的网络服务器和客户端程序。

Netty封装了Java NIO,屏蔽了复杂的底层实现,它具有以下优势:

  1. Netty具有一个活跃的社区
  2. Netty实现了各种协议,基本上不需要开发者对主流的协议进行二次开发
  3. Netty对线程,selector进行了优化,它的reactor线程模型有着良好的并发表现
  4. Netty具备完备的拆包解包,异常检测机制,能够让开发者专注于业务逻辑而不需要关心NIO的繁重细节
  5. Netty解决了JDK的原生NIO的很多包括空轮询在内的bug

Read More

之前的两篇文章中,我讲解了ShieldTXC的框架内核的代码实现及其原理。前前后后设计开发总共耗时一周,目前的版本基本可用,那么本文我们就使用这个版本进行一次实战演练,让读者直观的感受一下ShieldTXC实现的本地消息表分布式事务方案的魅力。

进入shieldTXC项目,可以看到项目整体的结构如下:

shieldTxc
    |-txc-core        ShieldTXC内核
    |-txc-demo-up     分布式事务上游应用
    |-txc-demo-down   分布式事务下游应用

首先对项目进行整体的编译,接着对应用txc-demo-up、txc-demo-down,修改配置文件application.properties

Read More

上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送。本文中,我们就一起来集中分析一下批量消息的发送是怎样的逻辑。

DefaultProducer.send

RocketMQ提供了批量发送消息的API,同样在DefaultProducer.java中

@Override
public SendResult send(
    Collection<Message> msgs) throws MQClientException, RemotingException, 
        MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}

它的参数为Message集合,也就是一批消息。它的另外一个重载方法提供了发送超时时间参数

@Override
public SendResult send(Collection<Message> msgs,
    long timeout) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs), timeout);
}

可以看到是将消息通过batch()方法打包为单条消息,我们看一下batch方法的逻辑

Read More

Fork me on GitHub