• 开源rocketmq-java客户端sdk使用方法
  • 开源rocketmq-java客户端sdk使用场景解读
  • 混合云场景案例解析
  • 下一站:测试/线上一体化

开源rocketmq-java客户端sdk使用方法

目前通过RocketMQ开源客户端可以访问阿里云RocketMQ的 普通消息、顺序消息、延时/定时消息、事务消息,基本覆盖了云上MQ的主流场景。

我们接着讲解一下如何通过开源SDK使用云上RocketMQ。

本部分的配置项,生产者、消费者应用都需要添加。

我们通过代码案例讲解一下如何使用RocketMQ开源客户端的访问云上的MQ,进行消息发送与消费。

Read More

我们继续对消息消费流程的源码进行解析。

本文主要针对push模式下的消息拉取流程进行解析。我们重点分析集群消费模式,对于广播模式其实很好理解,每个消费者都需要拉取主题下面的所有消费队列的消息。

在集群消费模式下,同一个消费者组内包含了多个消费者实例,同一个topic下存在多个消费队列。对于单个消费者组,其内部维护了一个线程池进行消息消费,这部分内容可以移步 跟我学RocketMQ之消息消费源码解析(2)

之前我们已经研究了消费者的初始化流程,在启动MQClientInstance过程中,启动了一个消息拉取线程PullMessageService进行消息拉取工作。

Read More

RocketMQ消费者在进行消费时,需要遵循 “订阅关系一致” 原则,关于订阅关系一致,我引用阿里云RocketMQ页面的解释,如下图:

rmq0.png

从图中可以提炼出关键词,即:

同一个消费者组订阅的topic、topic中的tag必须保持一致,否则会出现消费不到消息的情况。

举个例子:比如我们有个topic名为DEMO_TOPIC,它有两个tag,分别为tagA、tagB。用一个消费者组demo_group分别订阅tagA、tagB,这时就会出现某个tag对应的消费者消费不到消息的情况。

解决方法就是:针对不同的tag使用不同的消费者组,在上面的案例中的解决方法为:使用demo_group_A 订阅tagA,使用demo_group_B订阅tagB。

提供了解决方案,还是有些意犹未尽,那么我们就深入RocketMQ的源码,感受一下订阅关系一致的机理。

Read More

研究了一段时间框架,有点审美疲劳,今天讲点轻松的,手写一个阻塞队列,实践一把lock+condition。

“等待通知”机制

首先复习一下经典的 “等待通知”机制。

线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁 –《极客时间-Java并发编程实战》

在Java中实现 “等待通知” 机制一般有两种方式,synchronized/Lock+Condition。

Read More

本文我们就Netty使用中常见的半包/粘包问题进行分析和处理。

解决粘包/半包问题本质方式为:有明确的业务应用数据区分标志,能够按照边界完整的接受Netty传输的数据。

字符串类型消息解决粘包/半包方式汇总

对于字符串类型的消息,Netty提供了多种现成的编解码工具解决粘包/半包问题,具体的工具类组合如下:

  • DelimiterBasedFrameDecoder+StringDecoder,通过特殊分隔符作为消息的结束标志
  • LineBasedFrameDecoder+StringDecoder,通过换行符作为消息的结束标志
  • FixedLengthFrameDecoder+StringDecoder,按照固定长度方式获取消息并解析

本文我们先介绍第一种方式,即:

DelimiterBasedFrameDecoder+StringDecoder

DelimiterBasedFrameDecoder的原理很好理解:通过利用特殊字符作为数据包的结束标志。发送方与接收方通过该标记对数据包进行分割解析即可。

Read More

本文我们接着分析RocketMQ消息消费的逻辑。

接上文,DefaultMQPushConsumerImpl启动过程中,启动了consumeMessageService消息消费线程。

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();

可以看到,是根据MessageListener的具体实现选择具体的consumeMessageService实现,我们重点讲解并行消费服务ConsumeMessageConcurrentlyService。

Read More

本文我们接着分析一下RocektMQ实现消息消费的源码细节,这部分的内容较多,因此拆分为几个章节分别进行讲解。

本章节重点讲解DefaultMQPushConsumer的代码逻辑。

DefaultMQPushConsumer使用样例

按照惯例还是先看一下DefaultMQPushConsumer的使用样例。

@PostConstruct
public void init() {
    defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
    defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
    // 从头开始消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式:集群模式
    defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册监听器
    defaultMQPushConsumer.registerMessageListener(messageListener);
    // 订阅所有消息
    try {
        defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
        defaultMQPushConsumer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e);
    }
    LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!");
}

初始化过程中需要调用registerMessageListener将具体的消费实现Listener注入。

@Component(value = "notifySendListenerImpl")
public class NotifySendListenerImpl implements MessageListenerConcurrently {

Read More

从零学Netty系列旨在记录笔者学习Netty的过程,从入门到熟练,尽量全面的对Netty做一次深入的探秘。

本文是 “从零学Netty” 系列的第一篇,我将介绍Netty的基本概念,线程模型,以及入门案例。

Netty基本概念

Netty是一个高性能网络通信框架,它提供了异步的、事件驱动的能力,能够帮助开发者快速开发高性能、高可靠的网络服务器和客户端程序。

Netty封装了Java NIO,屏蔽了复杂的底层实现,它具有以下优势:

  1. Netty具有一个活跃的社区
  2. Netty实现了各种协议,基本上不需要开发者对主流的协议进行二次开发
  3. Netty对线程,selector进行了优化,它的reactor线程模型有着良好的并发表现
  4. Netty具备完备的拆包解包,异常检测机制,能够让开发者专注于业务逻辑而不需要关心NIO的繁重细节
  5. Netty解决了JDK的原生NIO的很多包括空轮询在内的bug

Read More

本文是 “自己写分布式事务框架” 系列的汇总篇,我将从原理出发,提纲挈领地对本地消息表机制进行分析,并逐步带领读者实现基于本地消息表的分布式事务框架。

分布式事务在当下分布式开发领域已经成为必须考虑的因素,随着微服务、SOA架构思想日益被开发者所熟知,分布式事务的解决思路也不断被提出并被开源社区实现。

在互联网开发中,我们常常会采用一揽子“柔性事务”解决方案来保证系统内模块之间数据的 最终一致性

业界知名的分布式事务解决方案有如下几种:

  1. TCC方案,它的开源实现有TCC-Transaction、ByteTCC,阿里开源的SEATA框架也加入了对TCC模式的支持;
  2. 可靠消息最终一致方案,代表的实现方式为RocketMQ事务消息;
  3. SAGA事务,代表实现方式为华为开源的serviceComb;
  4. 最大努力通知型解决方案,该方案与业务耦合较为严重,因此业界也没有一个较为抽象的开源实现;
  5. 消息溯源方案(或称为本地消息表方案),该方案实现较为简单,但也与业务耦合较为严重,据我调查暂时没有抽象的开源实现。

上述方案中,我挑选了第五种 消息溯源方案(即本地消息表方案,后文均称为本地消息表方案) 作为自己写分布式事务框架的核心机制,旨在实现一个与业务无关的、基于消息的、异步确保的、最终一致的柔性事务框架。

框架的实现主要基于RocketMQ的普通消息,我们都知道RocketMQ已经支持了事务消息,这里只是基于它对本地消息表方案进行实现,原则上,本地消息表方案支持任意具有发布订阅能力的消息中间件。

上述其他实现在笔者之前的文章中有过较为系统的讲解,感兴趣的同学可以移步 置顶贴-我说分布式事务系列 进行了解学习,本系列不再展开说明。

Read More

之前的两篇文章中,我讲解了ShieldTXC的框架内核的代码实现及其原理。前前后后设计开发总共耗时一周,目前的版本基本可用,那么本文我们就使用这个版本进行一次实战演练,让读者直观的感受一下ShieldTXC实现的本地消息表分布式事务方案的魅力。

进入shieldTXC项目,可以看到项目整体的结构如下:

shieldTxc
    |-txc-core        ShieldTXC内核
    |-txc-demo-up     分布式事务上游应用
    |-txc-demo-down   分布式事务下游应用

首先对项目进行整体的编译,接着对应用txc-demo-up、txc-demo-down,修改配置文件application.properties

Read More

Fork me on GitHub