文章目录
  1. 1. 客户端接口
    1. 1.1. 客户端实体–任务实体BaseJob
    2. 1.2. 客户端接口-任务生产
      1. 1.2.1. JobProducerExecutor
      2. 1.2.2. JobProducerListener
    3. 1.3. 客户端接口–任务消费
      1. 1.3.1. JobConsumerExecutor
      2. 1.3.2. JobConsumerListenerAdapter
      3. 1.3.3. JobConsumerListener
  2. 2. 调用实例
    1. 2.1. 项目结构
    2. 2.2. 引入shield-job核心依赖,配置RocketMQ
    3. 2.3. 调度实例–任务生产者的开发
    4. 2.4. 调度实例–任务消费者的开发
      1. 2.4.1. 注意
    5. 2.5. 调用效果
  3. 3. 小结

自己写分布式调度组件继续更新,目前已经完成了一个里程碑版本。

千呼万唤始出来,却不是当初想的模样。之前立的flag太大,最终决定暂时放弃开发分布式版本,把目标改为基于消息队列RocketMQ的任务分发框架,具体的调度逻辑由调用方自行开发。

客户端接口

首先介绍客户端需要关注的接口以及实体

客户端实体–任务实体BaseJob

任务实体BaseJob为shield-job的调度核心实体,调用方的业务实体需要继承该作业抽象类,实现其中的encode() 以及 decode(String msg) 抽象方法。

其中:

客户端需要在encode()方法中实现业务对象到String形式的消息体转换,如:

@Override
public String encode() {
    // 组装消息协议头
    ImmutableMap.Builder headerBuilder = new ImmutableMap.Builder<String, String>()
            .put("version", this.getVersion())
            .put("jobTopic", this.getJobTopic())
            .put("jobTag", this.getJobTag())
            .put("jobBizDesc", "测试订单协议")
            .put("jobProducerGroup", this.getJobProducerGroup())
            .put("jobConsumerGroup", this.getJobConsumerGroup())
            .put("jobTraceId", this.getJobTraceId());
    header = headerBuilder.build();

    body = new ImmutableMap.Builder<String, String>()
            .put("userId", this.getUserId())
            .put("userName", this.getUserName())
            .put("orderId", this.getOrderId())
            .build();

    ImmutableMap<String, Object> map = new ImmutableMap.Builder<String, Object>()
            .put("header", header)
            .put("body", body)
            .build();
    // 返回序列化消息Json串
    String ret_string = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
        ret_string = objectMapper.writeValueAsString(map);
    } catch (JsonProcessingException e) {
        LOGGER.error("消息序列化json异常:", e);
    }
    return ret_string;
}

这里我使用了Guava的ImmutableMap作为消息协议的容器,使用jackson作为Json序列化工具。业务调用方必须将BaseJob的属性逐一填充,否则会在调度过程中抛出参数校验异常。

在decode(String msg)中实现String形式消息体到业务作业对象的转换,如:

@Override
public void decode(String msg) {
    Preconditions.checkNotNull(msg);
    ObjectMapper mapper = new ObjectMapper();
    try {
        JsonNode root = mapper.readTree(msg);
        // header
        this.setVersion(root.get("header").get("version").asText());
        this.setJobTopic(root.get("header").get("jobTopic").asText());
        this.setJobTag(root.get("header").get("jobTag").asText());
        this.setJobBizDesc(root.get("header").get("jobBizDesc").asText());
        this.setJobProducerGroup(root.get("header").get("jobProducerGroup").asText());
        this.setJobConsumerGroup(root.get("header").get("jobConsumerGroup").asText());
        this.setJobTraceId(root.get("header").get("jobTraceId").asText());
        // body
        this.setUserName(root.get("body").get("userName").asText());
        this.setOrderId(root.get("body").get("orderId").asText());
        this.setUserId(root.get("body").get("userId").asText());
    } catch (IOException e) {
        LOGGER.error("反序列化消息异常:", e);
    }
}

这里我将入参消息实体转换为对象本身,通过this对内部的属性进行赋值,从而实现字符串形式的消息体到对象的转换。避免代码过长影响阅读,完整的代码在下文会放出。

客户端接口-任务生产

这部分的内容为任务生产过程中需要调用的类及接口

JobProducerExecutor

JobProducerExecutor是shield-job的任务生产调度核心,是final类,不允许被继承。

类名 方法名 作用
JobProducerExecutor JobProducerExecutor init(RocketMQProducerProperty rocketMQProducerProperty) 初始化任务调度核心类
JobProducerExecutor Result execute(final JobProducerListener jobProducerListener,Object arg) 执行Job生产逻辑,投递Job消息到MQ中。业务方需要实现JobProducerListener的任务生产回调方法
JobProducerExecutor void start() throws MQClientException 启动内置的RocketMQ消息生产者,开启任务投递
JobProducerExecutor DefaultMQProducer getProducer() 获取RocketMQ生产者引用
RocketMQConsumerProperty 构造方法 构造RocketMQ生产者实例配置参数

JobProducerListener

JobProducerListener是作业生产者接口,客户端的作业生产者需要实现该接口,传入需要进行任务调度的作业实体列表(参数泛型继承BaseJob)

类名 方法名 作用
JobProducerListener List produce(final Object arg) 作业调度生产者返回待调度的业务实体协议列表,shield-job会回调该方法并将业务协议列表投递到MQ供作业消费者进行消费

对于非BaseJob的子类,编译阶段即失败。

客户端接口–任务消费

这部分为任务调度消费过程涉及到的类及接口

JobConsumerExecutor

JobConsumerExecutor是shield-job的任务消费调度核心,是final类,不允许被继承。

类名 方法名 作用
JobConsumerExecutor JobConsumerExecutor execute(RocketMQConsumerProperty rocketMQConsumerProperty,JobConsumerListenerAdapter jobConsumerListenerAdapter) 初始化任务消费核心,执行任务消费调度过程
JobConsumerExecutor void start() throws MQClientException 启动内置的RocketMQ消息消费者,开启任务消费
RocketMQProducerProperty 构造方法 构造RocketMQ消费者实例配置参数

JobConsumerListenerAdapter

JobConsumerListenerAdapter是shield-job的任务消费监听适配器,通过调用不同的构造方法决定是否进行消费失败消息的重投递

类名 方法名 作用
JobConsumerListenerAdapter JobConsumerListenerAdapter(JobConsumerListener jobConsumerListener) 任务消费适配器默认构造,只进行消费不进行消息重发
JobConsumerListenerAdapter JobConsumerListenerAdapter(JobConsumerListener jobConsumerListener,JobScheduleExecutorConfig jobScheduleExecutorConfig) 任务消费适配器带重发构造,既进行消费又进行消息重发
JobConsumerListenerAdapter ConsumeConcurrentlyStatus consumeMessage(List msgs,ConsumeConcurrentlyContext context) 任务消费核心接口,客户端消费者调用该方法,shield-job根据构造传参自行决定是直接消费还是需要进行消息重发

JobConsumerListener

JobConsumerListener是任务消费监听器接口,消费者需要实现consumeMessage方法,完成消息的消费

类名 方法名 作用
JobConsumerListener ConsumeConcurrentlyStatus consumeMessage(final List msgs, final ConsumeConcurrentlyContext context) 消息消费,该方法为标准的RocektMQ消费接口

调用实例

上述列出的类及其方法即shield-job任务调度的核心接口,直接看确实不直观,这里我用一个实例去讲解下如何在Spring Boot 2.1.2 Release中整合shield-job实现发布订阅模式的任务调度。

项目结构

项目结构如下

shield-job-springboot-demo

引入shield-job核心依赖,配置RocketMQ

首先引入shield-job核心的坐标,需要自行打包并安装到本地,有条件的可以运行 mvn clean deploy -DskipTests 直接发布到私服中。

坐标如下:

<dependency>
    <artifactId>shield-job-scheduler-core</artifactId>
    <groupId>com.snowalker.shield.job</groupId>
    <version>1.0.0</version>
</dependency>

由于shield-job核心通过RocketMQ作为任务队列,因此需要在调用方的配置文件中配置RocketMQ的NameServer地址,例如:

########################################################################
#
#     RocketMQ配置
#
#########################################################################
rocketmq.nameServer=192.168.1.107:9876

调度实例–任务生产者的开发

首先进行业务层任务生产者的开发。

定义一个名为OrderInfoJobProducer的类,并标记为Spring的bean

@Component
public class OrderInfoJobProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderInfoJobProducer.class);

定义任务发布的MQ的Topic、业务Tag(无tag可以不写)、生产者组、消费者组等参数

/**
* 测试订单任务TOPIC
*/
private static final String TOPIC = "SNOWALKER_TEST_SHIELD_JOB_TOPIC";
/**
* 测试订单任务TAG
*/
private static final String TAG = "SNOWALKER_TEST_SHIELD_JOB_TAG";
/**
* 测试订单生产者组
*/
private static final String PRODUCER_GROUP = "PID_SNOWALKER_TEST_SHIELD_JOB";
/**
* 测试订单消费者组
*/
private static final String CONSUMER_GROUP = "CID_SNOWALKER_TEST_SHIELD_JOB";

读入配置文件中的RocketMQ的NameServer地址

@Value("${rocketmq.nameServer}")
private String nameSrvAddr;

声明shield-job的任务生产者执行器JobProducerExecutor

private JobProducerExecutor jobProducerExecutor;

通过@PostConstruct方式初始化JobProducerExecutor并开启运行内部的生产者

@PostConstruct
public void init() throws MQClientException {
    // 实例化作业生产调度器
    jobProducerExecutor = new JobProducerExecutor()
            .init(new RocketMQProducerProperty(PRODUCER_GROUP,nameSrvAddr));
    jobProducerExecutor.getProducer().start();
}

核心调度逻辑,这里可以选用任意的调度框架,此处为spring-scheduler方式,通过 @Scheduled 注解声明此处为定时任务调度核心,并引入配置文件中的cron表达式,示例中的cron表达式为 0/3 ? 表示从0S开始,每次调度之间间隔三秒

@Scheduled(cron = "${order.resend.cron}")
public void execute() {
    try {
        // 传入JobProducerListener实现类,返回作业实体
        Result<JobSendResult> jobSendResult = jobProducerExecutor.execute(

此处通过匿名内部类的方式实现了JobProducerListener的回调方法 produce(Object arg) 将业务层拼装的业务订单调度协议(OrderInfoJobProcotol继承BaseJob)封装在 List 中,供JobProducerExecutor进行回调,提交任务至RocketMQ中。

(new JobProducerListener() {
    @Override
    public List produce(Object arg) {
        List<OrderInfoJobProcotol> jobs = new ArrayList<>(10);
        for (int i = 0; i < 1; i++) {
            OrderInfoJobProcotol orderInfoJobProcotol = new OrderInfoJobProcotol();
            orderInfoJobProcotol.setOrderId("OD_" + UUID.randomUUID().toString())
                    .setUserId("SNOWALKER_" + UUID.randomUUID().toString())
                    .setUserName("SNOWALKER_" + i)
                    .setJobTraceId("TRACE_" + UUID.randomUUID().toString())
                    .setJobTopic(TOPIC)
                    .setJobTag(TAG)
                    .setJobProducerGroup(PRODUCER_GROUP)
                    .setJobConsumerGroup(CONSUMER_GROUP)
            ;
            jobs.add(orderInfoJobProcotol);
        }
        return jobs;
    }
}), null);

这里可以根据返回的 Result 对返回的任务提交实体进行更多的操作。JobSendResult中包含了每一次调度中提交成功的任务列表 sendSuccessJobList 以及 提交失败的任务列表 sendFailureJobList,业务层可以解析这两个列表,进行诸如重发、落库等更多的业务操作。

        if (jobSendResult == null) {
            LOGGER.warn("执行作业分发失败,返回为空,topic={}", TOPIC);
        }
        if (jobSendResult.isSuccess()) {
            LOGGER.info("执行作业分发成功,jobSendResult={}", jobSendResult.toString());
        }
    } catch (Exception e) {
        LOGGER.error("执行作业分发异常", e);
    }
}

}

到此就完成了业务层的任务生产者的开发。

调度实例–任务消费者的开发

接着进行业务层任务消费者的开发。

首先定义一个业务任务消费者类OrderInfoJobConsumer,标记为Spring的bean。

@Component
public class OrderInfoJobConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderInfoJobConsumer.class);

声明需要进行订阅的topic、tag(默认为*)、消费者组

/**测试订单任务TOPIC*/
private static final String TOPIC = "SNOWALKER_TEST_SHIELD_JOB_TOPIC";
/**测试订单任务TAG*/
private static final String TAG = "SNOWALKER_TEST_SHIELD_JOB_TAG";
/**测试订单生产者组*/
private static final String CONSUMER_GROUP = "CID_SNOWALKER_TEST_SHIELD_JOB";

引入配置文件中配置的RocketMQ的nameServer地址

@Value("${rocketmq.nameServer}")
private String nameSrvAddr;

如果需要在消费失败后,进行消息重发,那么需要引入RedisTemplate,不引入则表示不需要框架做消息重发,业务层自行处理消费失败的逻辑。

@Autowired
RedisTemplate redisTemplate;

通过@PostConstruct方式启动任务消费者实例,并实现任务消费逻辑,此处为默认不需要框架进行消费失败后重发job消息的业务逻辑书写方式

@PostConstruct
public void execute() throws Exception {

    // 不需要重发
    new JobConsumerExecutor().execute(
            new RocketMQConsumerProperty(
                    TOPIC, CONSUMER_GROUP, nameSrvAddr, TAG),
                        new JobConsumerListenerAdapter(new JobConsumerListener() {
                            @Override
                            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                                try {
                                    String msgId = "";
                                    String msgBody = "";
                                    // 默认msgs只有一条消息
                                    for (MessageExt msg : msgs) {
                                        String message = new String(msg.getBody());
                                        OrderInfoJobProcotol protocol = new OrderInfoJobProcotol();
                                        protocol.decode(message);
                                        // 解析打印实体
                                        msgId = msg.getMsgId();
                                        msgBody = message;
                                    }
                                    LOGGER.info("模拟订单Job消息消费逻辑结束,状态--[RECONSUME_LATER]--msgId={}", msgId);

                                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                                } catch (Exception e) {
                                    LOGGER.error("钱包扣款消费异常,e={}", e);
                                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                                }
                            }
                        })).start();
}

可以看到,我们需要做的事情为:

  1. 实例化一个任务消费执行器JobConsumerExecutor
  2. 调用execute方法,传入RocketMQConsumerProperty实例
  3. execute的第二个参数为JobConsumerListenerAdapter实例,它接受一个参数JobConsumerListener,这里我们需要实现JobConsumerListener接口,并将具体的实现的引用传入JobConsumerListenerAdapter构造方法
  4. 实现consumeMessage方法,将消费状态ConsumeConcurrentlyStatus返回。shield-job会代理真实的RocketMQ的MessageListenerConcurrently接口,将消费状态返给RocketMQ,效果同直接使用RocketMQ的DefaultMQPushConsumer进行消费相同。

如果我们使用JobConsumerListenerAdapter的另外一个构造方法,JobConsumerListenerAdapter(JobConsumerListener jobConsumerListener,JobScheduleExecutorConfig jobScheduleExecutorConfig) ,该构造方法表示需要对消费失败的消息进行消息重发,那么shield-job会对消费状态ConsumeConcurrentlyStatus进行解析,并对达到重试阈值且重发次数小于3的消息进行消息存储并开启异步线程进行重投递,具体代码如下:

@PostConstruct
public void execute1() throws Exception {

    JobScheduleExecutorConfig jobScheduleExecutorConfig =
            new JobScheduleExecutorConfig(1,
                    nameSrvAddr,
                    new MessageStoreRedisTemplate(redisTemplate),
                    Executors.newScheduledThreadPool(10),
                    0,
                    3,
                    TimeUnit.SECONDS);

    // 需要消息重发
    new JobConsumerExecutor().execute(
            new RocketMQConsumerProperty(
                    TOPIC, CONSUMER_GROUP, nameSrvAddr, TAG),
            new JobConsumerListenerAdapter(new JobConsumerListener() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    return getConsumeConcurrentlyStatus(msgs);
                }
            }, jobScheduleExecutorConfig)).start();
}

可以看到,我们需要构造并实例化任务重发配置JobScheduleExecutorConfig,并将其传入JobConsumerListenerAdapter的构造方法中。

注意

目前该功能还处于进一步测试阶段,因此建议还是使用 JobConsumerListenerAdapter(JobConsumerListener jobConsumerListener) 进行JobConsumerListenerAdapter的实例化。

调用效果

到这里,我们就完成了业务模块对shield-job的整合及调度任务的开发,运行一下程序看一下执行效果。

首先保证业务系统到RocektMQ的连通,执行启动方法,查看到日志如下:

2019-04-15 14:16:11.188 [main] INFO  c.s.shield.job.consumer.JobConsumerExecutor [45]
- JobConsumerExecuter init successfully, topic=SNOWALKER_TEST_SHIELD_JOB_TOPIC, consumerGroup=CID_SNOWALKER_TEST_SHIELD_JOB
2019-04-15 14:16:13.794 [main] INFO  c.s.shield.job.producer.JobProducerExecutor [56] 
- JobProducerExecutor init successfully, jobProducerGroup=PID_SNOWALKER_TEST_SHIELD_JOB
......
2019-04-15 14:16:14.406 [main] INFO  com.snowalker.shield.jobdemo.App [59] 
- Started App in 6.482 seconds (JVM running for 7.747)
2019-04-15 14:16:14.407 [main] INFO  com.snowalker.shield.jobdemo.App [26] 
- shield-job-server启动完成......
2019-04-15 14:16:15.577 [scheduling-1] DEBUG c.s.shield.job.producer.JobProducerExecutor [100] 
- send job message successful, message sendStatus=SEND_OK, msgId=AC1E534751682437C6DC4B28F5D30000, queueOffset=142, transactionId=null, offsetMsgId=AC1E423300002A9F000000000059D06E, regionId=DefaultRegion, sendSuccessJobList.size()=0
2019-04-15 14:16:15.578 [scheduling-1] INFO  c.s.shield.job.producer.JobProducerExecutor [121] 
- send job messages finished, sendSuccessJobList messages size=1, sendFailureJobList messages size=0, topic=SNOWALKER_TEST_SHIELD_JOB_TOPIC, tag=SNOWALKER_TEST_SHIELD_JOB_TAG
2019-04-15 14:16:15.578 [scheduling-1] INFO  c.s.shield.jobdemo.producer.OrderInfoJobProducer [93] 
- 执行作业分发成功,jobSendResult=Result{code=200, message='SUCCESS', content=JobSendResult{sendSuccessJobList=[BaseJob{version='1.0', jobTopic='SNOWALKER_TEST_SHIELD_JOB_TOPIC', jobTag='SNOWALKER_TEST_SHIELD_JOB_TAG', jobBizDesc='null', jobProducerGroup='PID_SNOWALKER_TEST_SHIELD_JOB', jobConsumerGroup='CID_SNOWALKER_TEST_SHIELD_JOB', jobTraceId='TRACE_cbdb715e-173b-4adc-b0ed-1a1a1aea689c', jobMsgId='AC1E534751682437C6DC4B28F5D30000', params=null}], sendFailureJobList=[]}}
2019-04-15 14:16:16.454 [ConsumeMessageThread_8] INFO  c.s.shield.jobdemo.consumer.OrderInfoJobConsumer [100] 
- 模拟订单Job消息消费逻辑结束,状态--[RECONSUME_LATER]--msgId=AC1E534751682437C6DC4B28F5D30000

由于我们的调度生产者和消费者在同一个应用内,因此属于 “自产自销” 的模式,模拟的订单作业消息投递到MQ之后,下发至当前应用的消费者,被消费者处理。

返回RECONSUME_LATER是笔者为了测试重发,在实际业务中,当作业消费成功,直接返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 即可。

小结

到此,我们就完成了使用shield-job实现业务上的任务调度逻辑的整合过程。代码地址为: shield-job

后续的文章中,我将对shield-job的实现方式抽丝剥茧,将我在开发过程中的感悟、思考以及一些编码技巧进行进一步的分享。

昨夜西风凋碧树。独上高楼,望尽天涯路。

文章目录
  1. 1. 客户端接口
    1. 1.1. 客户端实体–任务实体BaseJob
    2. 1.2. 客户端接口-任务生产
      1. 1.2.1. JobProducerExecutor
      2. 1.2.2. JobProducerListener
    3. 1.3. 客户端接口–任务消费
      1. 1.3.1. JobConsumerExecutor
      2. 1.3.2. JobConsumerListenerAdapter
      3. 1.3.3. JobConsumerListener
  2. 2. 调用实例
    1. 2.1. 项目结构
    2. 2.2. 引入shield-job核心依赖,配置RocketMQ
    3. 2.3. 调度实例–任务生产者的开发
    4. 2.4. 调度实例–任务消费者的开发
      1. 2.4.1. 注意
    5. 2.5. 调用效果
  3. 3. 小结
Fork me on GitHub