实战分布式之电商高并发秒杀网关核心要点及代码实现
从本文开始就进入到了秒杀场景的实现环节,我将主要分两大块进行逐一解析,分别为:
- 高并发秒杀网关核心要点及代码实现
- 高并发秒杀收单平台核心要点及代码实现
本篇主要讲一下秒杀网关的要点及编码。
秒杀网关业务场景概述
首先对秒杀网关的主要业务逻辑进行详细的说明。
- 秒杀服务网关在启动时对商品、库存、其他配置进行预加载
- 用户通过H5/小程序/APP等终端发起秒杀请求,将秒杀下单请求发送至秒杀服务网关
- 网关收单接口对用户下单请求进行前置预校验,包括
- 参数合法性校验
- 商品合法性校验,主要校验下单产品id对应的商品是否存在
- 进行商品库存校验,即当前库存是否大于0(或者大于等于1)
- 对商品库存进行预减,判断预减库存结果是否大于等于0
- 第2步通过后,对收单参数进行处理,组装为秒杀下单消息协议,将消息投递到RocketMQ中
- 为用户同步返回 “秒杀请求已接收,排队中”
- 上述步骤1-4一旦有处理失败或者校验失败的情况都同步返回用户 “秒杀下单失败”
我们接着对上述业务流程中的重点进行详细展开
网关商品资源预加载
首先我们要明确一个前提,秒杀网关的职责为提供高可用接口、能够横向扩展、不直接与持久层进行交互。其中,对于商品、库存等访问频繁的配置性信息来讲,稳定性尤为重要。
因此我们应当在网关启动的过程中就对商品、库存等信息进行预加载。
我们采用SpringBoot作为网关脚手架,Spring支持多种资源预加载方式,这里主要介绍两种主流方式。
通过@PostConstruct方式加载(即init方式)
代码如下:
@Component
public class SecKillProductConfig implements InitializingBean {
private static final Map<String, SecKillProductDobj> PRODUCT_CONFIG_MAP =
new ConcurrentHashMap<>(16);
@PostConstruct
public void init() {
List<SecKillProductDobj> killProductList = secKillProductService.querySecKillProductList();
if (killProductList == null) {
throw new RuntimeException("请确保数据库中存在秒杀商品配置!");
}
killProductList.stream().forEach((secKillProductDobj -> {
String prodId = secKillProductDobj.getProdId();
PRODUCT_CONFIG_MAP.put(prodId, secKillProductDobj);
}));
LOGGER.info("[SecKillProductConfig]初始化秒杀配置完成,商品信息=[{}]", JSON.toJSONString(PRODUCT_CONFIG_MAP));
}
将代码所在的实体标注为一个Spring的bean即可。
这里是通过ConcurrentHashMap对商品信息从数据库中读出遍历进行缓存预热,这里只是一个示例。在实际项目中,我们推荐采用Redis作为商品、库存预热的容器。
通过实现InitializingBean方式进行加载
代码如下:
@Component
public class SecKillProductConfig implements InitializingBean {
private static final Map<String, SecKillProductDobj> PRODUCT_CONFIG_MAP =
new ConcurrentHashMap<>(16);
......
@Override
public void afterPropertiesSet() throws Exception {
List<SecKillProductDobj> killProductList = secKillProductService.querySecKillProductList();
if (killProductList == null) {
throw new RuntimeException("请确保数据库中存在秒杀商品配置!");
}
killProductList.stream().forEach((secKillProductDobj -> {
String prodId = secKillProductDobj.getProdId();
PRODUCT_CONFIG_MAP.put(prodId, secKillProductDobj);
}));
LOGGER.info("[SecKillProductConfig]初始化秒杀配置完成,商品信息=[{}]", JSON.toJSONString(PRODUCT_CONFIG_MAP));
}
具体的加载过程与@PostConstruct方式完全相同,我们主要讲解下InitializingBean接口的特点。
InitializingBean接口为Springd的bean提供了初始化能力,通过实现afterPropertiesSet方法,只要类实现了该接口,那么在初始化bean的时候都会执行该方法。具体的实现可以参考Spring框架中org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory类中的invokeInitMethods方法。代码逻辑如下
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.java
protected void invokeInitMethods(String beanName, Object bean, @Nullable RootBeanDefinition mbd) throws Throwable {
boolean isInitializingBean = bean instanceof InitializingBean;
if (isInitializingBean && (mbd == null || !mbd.isExternallyManagedInitMethod("afterPropertiesSet"))) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Invoking afterPropertiesSet() on bean with name '" + beanName + "'");
}
if (System.getSecurityManager() != null) {
try {
AccessController.doPrivileged(() -> {
((InitializingBean)bean).afterPropertiesSet();
return null;
}, this.getAccessControlContext());
} catch (PrivilegedActionException var6) {
throw var6.getException();
}
} else {
((InitializingBean)bean).afterPropertiesSet();
}
}
代码逻辑应当是很清晰了:判断当前加载的bean是否实现了InitializingBean接口,如果实现了InitializingBean接口,则回调bean的afterPropertiesSet方法。
在我们的项目中也就是加载了商品、库存等资源。
【注意】重要的,InitializingBean调用优先级要高于init方法(@PostConstruct),而且如果调用afterPropertiesSet方法时出错会直接抛出异常,不会再调用init方法。
秒杀订单预校验
我们接着看下如何在代码中对秒杀订单进行预校验。
具体的预校验流程在上文的业务流程介绍以及 实战分布式之电商高并发秒杀场景总览 这篇文章的图解中已经讲解的较为清晰,我们直接看代码实现:
// 下单前置参数校验
if (!secKillChargeService.checkParamsBeforeSecKillCharge(chargeOrderRequest, sessionId)) {
return Result.error(CodeMsg.PARAM_INVALID);
}
// 前置商品校验
String prodId = chargeOrderRequest.getProdId();
if (!secKillChargeService.checkProdConfigBeforeKillCharge(prodId, sessionId)) {
return Result.error(CodeMsg.PRODUCT_NOT_EXIST);
}
// 前置预减库存
if (!secKillProductConfig.preReduceProdStock(prodId)) {
return Result.error(CodeMsg.PRODUCT_STOCK_NOT_ENOUGH);
}
// 秒杀订单入队
return secKillChargeService.secKillOrderEnqueue(chargeOrderRequest, sessionId);
这里采用了防御编程思想中的卫语句编码方式,对校验失败的请求直接返回业务失败,这种fast-fail思想对系统鲁棒性的提高大有裨益。
我们展开看下下单前置参数校验方法 checkParamsBeforeSecKillCharge
// 入参校验
if (chargeOrderRequest == null) {
LOGGER.info("sessionId={},下单请求参数chargeOrderRequest为空,返回下单失败", sessionId);
return false;
}
LOGGER.info("sessionId={},下单开始,下单请求参数chargeOrderRequest=[{}].", sessionId, JSON.toJSONString(chargeOrderRequest));
String userPhoneNum = chargeOrderRequest.getUserPhoneNum();
String chargePrice = chargeOrderRequest.getChargePrice();
String prodId = chargeOrderRequest.getProdId();
if (StringUtils.isBlank(prodId) || StringUtils.isBlank(chargePrice) || StringUtils.isBlank(userPhoneNum)) {
LOGGER.info("sessionId={},下单必要参数为空,返回下单失败", sessionId);
return false;
}
// 价格合法性校验 是否>0?
BigDecimal chargePriceDecimal = new BigDecimal(chargePrice);
if (chargePriceDecimal.longValue() < 0) {
LOGGER.info("sessionId={},商品交易金额小于0,价格非法,返回下单失败", sessionId);
return false;
}
return true;
可以看到主要对下单参数是否非空,商品价格是否非负数进行了校验。
这些代码虽然看起来很简单,却是不可或缺的,我们要尽量保证进入业务流转过程的请求是合法且无害的。这对保护宝贵的持久层资源、提高系统容错性是非常有必要的。
【PS】我个人是比较推崇通过卫语句进行防御式编程的,这种风格的业务代码看起来清晰直观,而且能够减少分支代码嵌套层级,虽然看起来如同“面条”一般,但确实是一种容易操作维护的实战代码风格。
秒杀订单消息协议封装
当秒杀请求通过了参数、库存、商品等前置业务校验,就来到了下单环节。
下单环节通过消息队列(我们方案采用的是RocketMQ)进行承接。
之所以使用RocektMQ是因为它单机就支持亿级别消息堆积,在“削峰填谷”的高并发应用场景表现优异。本身支持发布订阅模式的异步通信,天然支持 “生产者-消费者” 模型;而且它上手难度很低,又是Java语言开发的,对Java系工程师很友好。
言归正传,我们看下面这行代码
// 秒杀订单入队
return secKillChargeService.secKillOrderEnqueue(chargeOrderRequest, sessionId);
这里通过调用secKillOrderEnqueue方法,对秒杀请求进行处理,进行消息实体转换并投递到RocketMQ中。
public Result secKillOrderEnqueue(ChargeOrderRequest chargeOrderRequest, String sessionId) {
// 订单号生成,组装秒杀订单消息协议
String orderId = UUID.randomUUID().toString();
String phoneNo = chargeOrderRequest.getUserPhoneNum();
ChargeOrderMsgProtocol msgProtocol = new ChargeOrderMsgProtocol();
msgProtocol.setUserPhoneNo(phoneNo)
.setProdId(chargeOrderRequest.getProdId())
.setChargeMoney(chargeOrderRequest.getChargePrice())
.setOrderId(orderId);
String msgBody = msgProtocol.encode();
LOGGER.info("秒杀订单入队,消息协议={}", msgBody);
DefaultMQProducer mqProducer = secKillChargeOrderProducer.getProducer();
Message message = new Message(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), msgBody.getBytes());
try {
SendResult sendResult = mqProducer.send(message);
if (sendResult == null) {
LOGGER.error("sessionId={},秒杀订单消息投递失败,下单失败.msgBody={},sendResult=null", sessionId, msgBody);
return Result.error(CodeMsg.BIZ_ERROR);
}
ChargeOrderResponse chargeOrderResponse = new ChargeOrderResponse();
BeanUtils.copyProperties(msgProtocol, chargeOrderResponse);
LOGGER.info("sessionId={},秒杀订单消息投递成功,订单入队.出参chargeOrderResponse={},sendResult={}", sessionId, chargeOrderResponse.toString(), JSON.toJSONString(sendResult));
return Result.success(CodeMsg.ORDER_INLINE, chargeOrderResponse);
} catch (Exception e) {
int sendRetryTimes = mqProducer.getRetryTimesWhenSendFailed();
LOGGER.error("sessionId={},sendRetryTimes={},秒杀订单消息投递异常,下单失败.msgBody={},e={}", sessionId, sendRetryTimes, msgBody, LogExceptionWapper.getStackTrace(e));
}
return Result.error(CodeMsg.BIZ_ERROR);
}
这段代码是secKillOrderEnqueue即秒杀订单的核心逻辑,我们通过解析秒杀下单实体ChargeOrderRequest,将其组装为ChargeOrderMsgProtocol并序列化为Message实体后调用DefaultMQProducer.send()方法,投递消息到RocketMQ的broker。
发送逻辑按住不表,这里重点分析下消息协议封装实体ChargeOrderMsgProtocol的实现:
ChargeOrderMsgProtocol.java
直接上代码:
public final class ChargeOrderMsgProtocol extends BaseMsg implements Serializable {
private static final long serialVersionUID = 73717163386598209L;
/**订单号*/
private String orderId;
/**用户下单手机号*/
private String userPhoneNo;
/**商品id*/
private String prodId;
/**用户交易金额*/
private String chargeMoney;
private Map<String, String> header;
private Map<String, String> body;
@Override
public String encode() {
// 组装消息协议头
ImmutableMap.Builder headerBuilder = new ImmutableMap.Builder<String, String>()
.put("version", this.getVersion())
.put("topicName", MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic());
header = headerBuilder.build();
body = new ImmutableMap.Builder<String, String>()
.put("orderId", this.getOrderId())
.put("userPhoneNo", this.getUserPhoneNo())
.put("prodId", this.getProdId())
.put("chargeMoney", this.getChargeMoney())
.build();
ImmutableMap<String, Object> map = new ImmutableMap.Builder<String, Object>()
.put("header", header)
.put("body", body)
.build();
// 返回序列化消息Json串
String ret_string = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
ret_string = objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new RuntimeException("ChargeOrderMsgProtocol消息序列化json异常", e);
}
return ret_string;
}
@Override
public void decode(String msg) {
Preconditions.checkNotNull(msg);
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode root = mapper.readTree(msg);
// header
this.setVersion(root.get("header").get("version").asText());
this.setTopicName(root.get("header").get("topicName").asText());
// body
this.setOrderId(root.get("body").get("orderId").asText());
this.setUserPhoneNo(root.get("body").get("userPhoneNo").asText());
this.setChargeMoney(root.get("body").get("chargeMoney").asText());
this.setProdId(root.get("body").get("prodId").asText());
} catch (IOException e) {
throw new RuntimeException("ChargeOrderMsgProtocol消息反序列化异常", e);
}
}
...省略get set...
这里通过实现BaseMsg的模板方法encode、decode(分别表示对消息进行编码、解码),通过对this对象进行属性设置,实现了消息协议的自编解码。
【注意】 这里使用了guava的 ImmutableMap ,消息协议一旦初始化完成,后续就不能被更改,保证消息协议的不变性和安全性。
协议类一旦编写测试完成就不能再修改,标注为final表示不允许被继承且不可修改。
小结
本文主要对高并发秒杀场景下的网关设计的重点逻辑进行了讲解,并对资源预加载手段、预校验代码编写套路以及消息协议的封装模板代码实现进行了较为详细的解释。
希望能够对读者朋友开发类似系统有所帮助。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。