跟我学RocketMQ之开源客户端混合云实践与案例解析
- 开源rocketmq-java客户端sdk使用方法
- 开源rocketmq-java客户端sdk使用场景解读
- 混合云场景案例解析
- 下一站:测试/线上一体化
开源rocketmq-java客户端sdk使用方法
目前通过RocketMQ开源客户端可以访问阿里云RocketMQ的 普通消息、顺序消息、延时/定时消息、事务消息,基本覆盖了云上MQ的主流场景。
我们接着讲解一下如何通过开源SDK使用云上RocketMQ。
本部分的配置项,生产者、消费者应用都需要添加。
我们通过代码案例讲解一下如何使用RocketMQ开源客户端的访问云上的MQ,进行消息发送与消费。
使用4.5.1版本进行讲解,在项目中添加如下依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.5.1</version>
</dependency>
在springboot项目的application.properties中添加rocketmq配置
#nameServer地址
rocketmq.nameServer.offline=开源版本nameServer地址
rocketmq.nameServer.aliyun=阿里云接入点
rocketmq.acl.accesskey=云上accesskey
rocketmq.acl.accessSecret=云上accessSecret
ak sk 写上对开源版本代码运行没有影响。换到开源版本,这部分代码保留即可,
为了方便代码部署,编写一个配置读取类,根据环境变量设置的环境类型,选取不同的nameServer
@Component
public class MQNamesrvConfig {
// 线下开源rocketMQ
@Value("${rocketmq.nameServer.offline}")
String offlineNamesrv;
// 阿里云RocketMQ接入点
@Value("${rocketmq.nameServer.aliyun}")
String aliyunNamesrv;
/**
* 根据环境选择nameServer地址
* @return
*/
public String nameSrvAddr() {
String envType = System.getProperty("envType");
if (StringUtils.isBlank(envType)) {
throw new IllegalArgumentException("please insert envType");
}
switch (envType) {
case "offline" : {
return offlineNamesrv;
}
case "aliyun" : {
return aliyunNamesrv;
}
default : {
throw new IllegalArgumentException("please insert right envType, offline/aliyun");
}
}
}
}
生产者代码样例
为了支持代码访问云上MQ需要更改生产者构造方法,代码如下:
@Autowired
MQNamesrvConfig namesrvConfig;
@Value("${rocketmq.acl.accesskey}")
String aclAccessKey;
@Value("${rocketmq.acl.accessSecret}")
String aclAccessSecret;
private DefaultMQProducer defaultMQProducer;
@PostConstruct
public void init() {
defaultMQProducer =
new DefaultMQProducer("GID_SNOWALKE_TEST",
new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)));
defaultMQProducer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
// 发送失败重试次数
defaultMQProducer.setRetryTimesWhenSendFailed(3);
try {
defaultMQProducer.start();
} catch (MQClientException e) {
LOGGER.error("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
throw new RuntimeException("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!", e);
}
LOGGER.info("[秒杀订单生产者]--SecKillChargeOrderProducer加载完成!");
}
关键点在于构造初始化的过程中,设置AclClientRPCHook,并将accesskey与secretKey设置进去。官方sample代码如下
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
消费者代码样例
消费者初始化代码也需要稍作改造。
@Autowired
MQNamesrvConfig namesrvConfig;
@Value("${rocketmq.acl.accesskey}")
String aclAccessKey;
@Value("${rocketmq.acl.accessSecret}")
String aclAccessSecret;
private DefaultMQPushConsumer defaultMQPushConsumer;
@Resource(name = "secKillChargeOrderListenerImpl")
private MessageListenerConcurrently messageListener;
@PostConstruct
public void init() {
defaultMQPushConsumer =
new DefaultMQPushConsumer("GID_SNOWALKE_TEST",,
new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)),
// 平均分配队列算法,hash
new AllocateMessageQueueAveragely());
defaultMQPushConsumer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
// 从头开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式:集群模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
defaultMQPushConsumer.registerMessageListener(messageListener);
// 设置每次拉取的消息量,默认为1
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
// 订阅所有消息
try {
defaultMQPushConsumer.subscribe(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), "*");
// 启动消费者
defaultMQPushConsumer.start();
} catch (MQClientException e) {
LOGGER.error("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
throw new RuntimeException("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!", e);
}
LOGGER.info("[秒杀下单消费者]--SecKillChargeOrderConsumer加载完成!");
}
同生产者初始化过程类似,关键点在于构造初始化的过程中,设置AclClientRPCHook,并将accesskey与secret设置进去。官方sample代码如下
我们主要以PushConsumer为例,PullConsumer的使用可以自行前往官方sample目录下查看
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("ConsumerGroupName",
getAclRPCHook(),
new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
补充:其他改造方案简介
- 通过java -D方式将ak sk及nameserver参数设置为环境变量
- 通过docker -e 方式将参数设置为环境变量
- 如果使用了K8S部署应用,则使用values.yaml或者configMap则更为方便
这些方式几乎可以做到零改动升级。
开源rocketmq-java客户端sdk使用场景解读
使用开源RocketMQ的java客户端能够满足多种需求,如:
- 混合云场景
- 测试线上一体化
- 云迁移场景
混合云场景
如果您的企业需要同时访问阿里云和其他公有云或者本地数据中心,开源客户端也可以支持多云环境的无缝切换。很好的支持分布式系统多云环境的数据同步、异地容灾等功能。
我们常说,不能把鸡蛋放在一个篮子里。
这种思维恰恰就是多云环境的体现。对于某些核心业务,我们要满足HA,应用部署需要具备可迁移,多环境热备,抗不可抗力等的能力。
我们会选择打通多云环境,或者建立私有云平台,保证跨云数据同步。
本次分享主要就混合云场景做重要的分析,我们将采用一个电商秒杀业务场景的混合云场景改造进行案例分析。
测试/线上一体化
鉴于开源客户端既可以访问开源RocketMQ集群又可以访问阿里云MQ,您的应用可以在开发、测试阶段使用开源RocketMQ集群,线上阶段使用阿里云RocketMQ,这将很大程度为您节约成本。切换过程无需修改一行代码。
测试线上一体化的核心为 “一套代码,处处运行”,线上线下逻辑相同,不需要进行针对环境进行变更,使业务逻辑无状态化。
云上的MQ公网模式下按量收费,为了节省开发测试成本,因此在开发测试阶段选择使用线下的开源RocektMQ;正式上线再接入线上的云MQ。
客户端sdk支持一套代码访问开源、云上的MQ,从而降低开发成本。
示例图如下:
案例:我当前所在的业务线在生产环境主要使用了阿里云RocketMQ的铂金版,线下测试如果也使用铂金版成本较高,换成公网由于Topic数量有三十多个,测试开发两套环境每个月固定支出几千,因此对代码进行了升级。
测试和开发环境功能测试访问本地开源RocketMQ集群,线上使用阿里云铂金版RocketMQ,效果很稳定。
云迁移场景
如果您正想从开源RocketMQ迁移到阿里云MQ,开源客户端是最好的选择,升级到开源客户端后,您可以在应用不停机的情况下无缝迁移到阿里云RocketMQ。
商业版MQ具有完备的技术团队支撑,稳定性及可靠性表现优异,如果应用有上云需求,访问对应区域的阿里云RocketMQ是一个不错的选择。
尤其是之前已经基于开源版MQ部署过的应用,代码量可观,如果使用云上的客户端进行兼容,修改量比较大,因此通过ACL支持的开源版客户端,几乎零修改,便可以无缝上云。
推荐的迁移策略如下:
- 对现有应用的客户端进行升级,加入ACL支持
- 进行线下的兼容性测试,保证原先业务不受影响
- 将配置信息剥离到环境变量中,如:
- 通过java -D方式将参数设置为环境变量
- 通过docker -e 方式将参数设置为环境变量
- 如果使用了K8S部署应用,则使用values.yaml或者configMap则更为方便
- 将应用打包后部署到云上环境即可完成迁移
流程图如下:
混合云场景案例解析
混合云场景的案例分析基于一个简化的电商秒杀场景业务系统讲解,项目的代码地址为 https://github.com/TaXueWWL/seckill-rocketmq
项目简介:
用户访问秒杀网关进行秒杀订单下单,平台通过RocketMQ对秒杀流量进行削峰填谷。用户通过主动查询订单 获取下单结果的完整业务流程
本地化部署架构解析
上述场景为非混合云平台部署方式,该方式的部署架构图如下:
秒杀网关服务、秒杀订单服务、开源MQ集群、缓存集群、DB集群都在一个环境进行部署。
混合云平台部署解析
我们对上述部署方式进行混合云改造,将数据库、MQ等中间件更换为云上版本,订单收单业务模块部署到云上环境;将缓存、收单网关部署到本地环境(使用本地环境模拟IDC)。整个部署方式变成了混合云架构。
整个平台的部署架构图如下:
案例流程解析
案例的流程图如下;
demo演示及讲解
我们对demo项目进行演示,主要演示通过访问阿里云云上MQ实现消息收发。
消息发送控制台截图:
消息消费控制台截图:
改造流程总结
- 代码支持云上MQ,具体方法参考第一章节 《开源rocketmq-java客户端sdk使用方法》
- 进行应用迁移部署
- 功能验证
可以看到通过4.5.1以上的新版本RocketMQ客户端SDK,能够明显降低我们业务上云、混合云部署等的改造成本,提高业务部署的可靠性。
下一站:测试/线上一体化
进行混合云部署改造的过程其实就涉及到了另外的场景,即:测试/线上一体化。我们可以通过4.5.1以上的新版本RocketMQ客户端SDK达到该目的。
即:开发测试环境访问本地开源MQ集群,进行测试,节约成本。线上使用云上MQ,节约运维成本,更好的满足服务高可用。
上云指导文档
如果有以下场景需求可以访问下列链接,获取更多的上云指导
- 云迁移场景:想从开源RocketMQ迁移到阿里云MQ
- 混合云场景:应用需要同时访问私有云的开源RocketMQ和阿里云tMQ。
- 测试环境访问开源RocketMQ,线上环境访问阿里云MQ。
参考资料
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。