文章目录
  1. 1. 开源rocketmq-java客户端sdk使用方法
    1. 1.1. 生产者代码样例
    2. 1.2. 消费者代码样例
    3. 1.3. 补充:其他改造方案简介
  2. 2. 开源rocketmq-java客户端sdk使用场景解读
  3. 3. 混合云场景
    1. 3.1. 测试/线上一体化
    2. 3.2. 云迁移场景
  4. 4. 混合云场景案例解析
    1. 4.1. 项目简介:
    2. 4.2. 本地化部署架构解析
    3. 4.3. 混合云平台部署解析
    4. 4.4. 案例流程解析
    5. 4.5. demo演示及讲解
    6. 4.6. 改造流程总结
  5. 5. 下一站:测试/线上一体化
  6. 6. 上云指导文档
  7. 7. 参考资料
  • 开源rocketmq-java客户端sdk使用方法
  • 开源rocketmq-java客户端sdk使用场景解读
  • 混合云场景案例解析
  • 下一站:测试/线上一体化

开源rocketmq-java客户端sdk使用方法

目前通过RocketMQ开源客户端可以访问阿里云RocketMQ的 普通消息、顺序消息、延时/定时消息、事务消息,基本覆盖了云上MQ的主流场景。

我们接着讲解一下如何通过开源SDK使用云上RocketMQ。

本部分的配置项,生产者、消费者应用都需要添加。

我们通过代码案例讲解一下如何使用RocketMQ开源客户端的访问云上的MQ,进行消息发送与消费。

使用4.5.1版本进行讲解,在项目中添加如下依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.5.1</version>
</dependency>

在springboot项目的application.properties中添加rocketmq配置

#nameServer地址
rocketmq.nameServer.offline=开源版本nameServer地址
rocketmq.nameServer.aliyun=阿里云接入点
rocketmq.acl.accesskey=云上accesskey
rocketmq.acl.accessSecret=云上accessSecret

ak sk 写上对开源版本代码运行没有影响。换到开源版本,这部分代码保留即可,

为了方便代码部署,编写一个配置读取类,根据环境变量设置的环境类型,选取不同的nameServer

@Component
public class MQNamesrvConfig {

    // 线下开源rocketMQ
    @Value("${rocketmq.nameServer.offline}")
    String offlineNamesrv;

    // 阿里云RocketMQ接入点
    @Value("${rocketmq.nameServer.aliyun}")
    String aliyunNamesrv;

    /**
    * 根据环境选择nameServer地址
    * @return
    */
    public String nameSrvAddr() {
        String envType = System.getProperty("envType");
        if (StringUtils.isBlank(envType)) {
            throw new IllegalArgumentException("please insert envType");
        }
        switch (envType) {
            case "offline" : {
                return offlineNamesrv;
            }
            case "aliyun" : {
                return aliyunNamesrv;
            }
            default : {
                throw new IllegalArgumentException("please insert right envType, offline/aliyun");
            }
        }
    }
}

生产者代码样例

为了支持代码访问云上MQ需要更改生产者构造方法,代码如下:

@Autowired
MQNamesrvConfig namesrvConfig;

@Value("${rocketmq.acl.accesskey}")
String aclAccessKey;

@Value("${rocketmq.acl.accessSecret}")
String aclAccessSecret;

private DefaultMQProducer defaultMQProducer;

@PostConstruct
public void init() {
    defaultMQProducer =
            new DefaultMQProducer("GID_SNOWALKE_TEST",
            new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)));
    defaultMQProducer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
    // 发送失败重试次数
    defaultMQProducer.setRetryTimesWhenSendFailed(3);
    try {
        defaultMQProducer.start();
    } catch (MQClientException e) {
        LOGGER.error("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
        throw new RuntimeException("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!", e);
    }
    LOGGER.info("[秒杀订单生产者]--SecKillChargeOrderProducer加载完成!");
}

关键点在于构造初始化的过程中,设置AclClientRPCHook,并将accesskey与secretKey设置进去。官方sample代码如下

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}

消费者代码样例

消费者初始化代码也需要稍作改造。

@Autowired
MQNamesrvConfig namesrvConfig;

@Value("${rocketmq.acl.accesskey}")
String aclAccessKey;

@Value("${rocketmq.acl.accessSecret}")
String aclAccessSecret;

private DefaultMQPushConsumer defaultMQPushConsumer;

@Resource(name = "secKillChargeOrderListenerImpl")
private MessageListenerConcurrently messageListener;

@PostConstruct
public void init() {
    defaultMQPushConsumer =
            new DefaultMQPushConsumer("GID_SNOWALKE_TEST",,
                    new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)),
                    // 平均分配队列算法,hash
                    new AllocateMessageQueueAveragely());
    defaultMQPushConsumer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
    // 从头开始消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式:集群模式
    defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册监听器
    defaultMQPushConsumer.registerMessageListener(messageListener);
    // 设置每次拉取的消息量,默认为1
    defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
    // 订阅所有消息
    try {
        defaultMQPushConsumer.subscribe(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), "*");
        // 启动消费者
        defaultMQPushConsumer.start();
    } catch (MQClientException e) {
        LOGGER.error("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
        throw new RuntimeException("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!", e);
    }
    LOGGER.info("[秒杀下单消费者]--SecKillChargeOrderConsumer加载完成!");
}

同生产者初始化过程类似,关键点在于构造初始化的过程中,设置AclClientRPCHook,并将accesskey与secret设置进去。官方sample代码如下

我们主要以PushConsumer为例,PullConsumer的使用可以自行前往官方sample目录下查看

DefaultMQPushConsumer consumer = 
new DefaultMQPushConsumer("ConsumerGroupName", 
                            getAclRPCHook(), 
                            new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");


static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
} 

补充:其他改造方案简介

  1. 通过java -D方式将ak sk及nameserver参数设置为环境变量
  2. 通过docker -e 方式将参数设置为环境变量
  3. 如果使用了K8S部署应用,则使用values.yaml或者configMap则更为方便

这些方式几乎可以做到零改动升级。

开源rocketmq-java客户端sdk使用场景解读

使用开源RocketMQ的java客户端能够满足多种需求,如:

  1. 混合云场景
  2. 测试线上一体化
  3. 云迁移场景

混合云场景

如果您的企业需要同时访问阿里云和其他公有云或者本地数据中心,开源客户端也可以支持多云环境的无缝切换。很好的支持分布式系统多云环境的数据同步、异地容灾等功能。

我们常说,不能把鸡蛋放在一个篮子里。

这种思维恰恰就是多云环境的体现。对于某些核心业务,我们要满足HA,应用部署需要具备可迁移,多环境热备,抗不可抗力等的能力。

我们会选择打通多云环境,或者建立私有云平台,保证跨云数据同步。

本次分享主要就混合云场景做重要的分析,我们将采用一个电商秒杀业务场景的混合云场景改造进行案例分析。

测试/线上一体化

鉴于开源客户端既可以访问开源RocketMQ集群又可以访问阿里云MQ,您的应用可以在开发、测试阶段使用开源RocketMQ集群,线上阶段使用阿里云RocketMQ,这将很大程度为您节约成本。切换过程无需修改一行代码。

测试线上一体化的核心为 “一套代码,处处运行”,线上线下逻辑相同,不需要进行针对环境进行变更,使业务逻辑无状态化。

云上的MQ公网模式下按量收费,为了节省开发测试成本,因此在开发测试阶段选择使用线下的开源RocektMQ;正式上线再接入线上的云MQ。

客户端sdk支持一套代码访问开源、云上的MQ,从而降低开发成本。

示例图如下:

测试线上一体化

案例:我当前所在的业务线在生产环境主要使用了阿里云RocketMQ的铂金版,线下测试如果也使用铂金版成本较高,换成公网由于Topic数量有三十多个,测试开发两套环境每个月固定支出几千,因此对代码进行了升级。

测试和开发环境功能测试访问本地开源RocketMQ集群,线上使用阿里云铂金版RocketMQ,效果很稳定。

云迁移场景

如果您正想从开源RocketMQ迁移到阿里云MQ,开源客户端是最好的选择,升级到开源客户端后,您可以在应用不停机的情况下无缝迁移到阿里云RocketMQ。

商业版MQ具有完备的技术团队支撑,稳定性及可靠性表现优异,如果应用有上云需求,访问对应区域的阿里云RocketMQ是一个不错的选择。

尤其是之前已经基于开源版MQ部署过的应用,代码量可观,如果使用云上的客户端进行兼容,修改量比较大,因此通过ACL支持的开源版客户端,几乎零修改,便可以无缝上云。

推荐的迁移策略如下:

  1. 对现有应用的客户端进行升级,加入ACL支持
  2. 进行线下的兼容性测试,保证原先业务不受影响
  3. 将配置信息剥离到环境变量中,如:
    1. 通过java -D方式将参数设置为环境变量
    2. 通过docker -e 方式将参数设置为环境变量
    3. 如果使用了K8S部署应用,则使用values.yaml或者configMap则更为方便
  4. 将应用打包后部署到云上环境即可完成迁移

流程图如下:

云迁移流程

混合云场景案例解析

混合云场景的案例分析基于一个简化的电商秒杀场景业务系统讲解,项目的代码地址为 https://github.com/TaXueWWL/seckill-rocketmq

项目简介:

用户访问秒杀网关进行秒杀订单下单,平台通过RocketMQ对秒杀流量进行削峰填谷。用户通过主动查询订单 获取下单结果的完整业务流程

本地化部署架构解析

上述场景为非混合云平台部署方式,该方式的部署架构图如下:

非混合云部署方式

秒杀网关服务、秒杀订单服务、开源MQ集群、缓存集群、DB集群都在一个环境进行部署。

混合云平台部署解析

我们对上述部署方式进行混合云改造,将数据库、MQ等中间件更换为云上版本,订单收单业务模块部署到云上环境;将缓存、收单网关部署到本地环境(使用本地环境模拟IDC)。整个部署方式变成了混合云架构。

整个平台的部署架构图如下:

混合云部署方式

案例流程解析

案例的流程图如下;

案例流程图

demo演示及讲解

我们对demo项目进行演示,主要演示通过访问阿里云云上MQ实现消息收发。

消息发送控制台截图:

消息发送

消息消费控制台截图:

消息发送

改造流程总结

  1. 代码支持云上MQ,具体方法参考第一章节 《开源rocketmq-java客户端sdk使用方法》
  2. 进行应用迁移部署
  3. 功能验证

可以看到通过4.5.1以上的新版本RocketMQ客户端SDK,能够明显降低我们业务上云、混合云部署等的改造成本,提高业务部署的可靠性。

下一站:测试/线上一体化

进行混合云部署改造的过程其实就涉及到了另外的场景,即:测试/线上一体化。我们可以通过4.5.1以上的新版本RocketMQ客户端SDK达到该目的。

即:开发测试环境访问本地开源MQ集群,进行测试,节约成本。线上使用云上MQ,节约运维成本,更好的满足服务高可用。

上云指导文档

如果有以下场景需求可以访问下列链接,获取更多的上云指导

  • 云迁移场景:想从开源RocketMQ迁移到阿里云MQ
  • 混合云场景:应用需要同时访问私有云的开源RocketMQ和阿里云tMQ。
  • 测试环境访问开源RocketMQ,线上环境访问阿里云MQ。

RocketMQ开源客户端访问阿里云MQ

参考资料

RocketMQ 官方Sample



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 开源rocketmq-java客户端sdk使用方法
    1. 1.1. 生产者代码样例
    2. 1.2. 消费者代码样例
    3. 1.3. 补充:其他改造方案简介
  2. 2. 开源rocketmq-java客户端sdk使用场景解读
  3. 3. 混合云场景
    1. 3.1. 测试/线上一体化
    2. 3.2. 云迁移场景
  4. 4. 混合云场景案例解析
    1. 4.1. 项目简介:
    2. 4.2. 本地化部署架构解析
    3. 4.3. 混合云平台部署解析
    4. 4.4. 案例流程解析
    5. 4.5. demo演示及讲解
    6. 4.6. 改造流程总结
  5. 5. 下一站:测试/线上一体化
  6. 6. 上云指导文档
  7. 7. 参考资料
Fork me on GitHub