自己写分布式调度组件shield-job之使用单机模式
考虑到部分业务场景下没有使用RocketMQ,因此我在之前的基础上添加了对单机模式调度的封装。
本文就单机模式下,不依赖RocketMQ如何使用shield-job进行定时任务调度展开讲解。
目前有两种主要的调用方式,客户端实现抽象调度方法 execute ,或者直接使用内置的 executeOneway 方法进行调用,接下来分别介绍两种方式。
调用方式1:自行实现execute方法
首先定义单机调度处理器,继承抽象类AbstractJobScheduleStandaloneHandler,需要实现execute()方法。
public class MyStandaloneScheduleHandler extends AbstractJobScheduleStandaloneHandler {
@Override
public void execute() {
// 生产
List<String> list = super.produce(new JobScheduleProducerListener<String>() {
@Override
public List<String> produce(Object arg) {
final List<String> strings2 = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
String str = "snowalker-----" + i;
strings2.add(str);
}
return strings2;
}
}, null);
// 消费
super.consume(new JobScheduleConsumerListener<String>() {
@Override
public Object consume(String s) {
LOGGER.info("consuming start!!!! s = " + s);
return null;
}
}, list);
}
}
在实现类的execute()中,分别调用produce和consume方法,实现生产和消费逻辑。
其中,在 produce(JobScheduleProducerListener
生产完成后,将列表返回,并传入 consume(JobScheduleConsumerListener
写个测试用例测一下,日志输入如下。
13:46:56.528 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----0
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----1
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----2
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----3
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----4
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----5
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----6
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----7
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----8
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----9
调用方式2:executeOneway方式
如果业务逻辑简单,推荐使用executeOneway方式,自描述性更强,调用方式如下
首先定义业务的调度实现类,execute()使用空实现即可。
public class MyStandaloneScheduleHandler2 extends AbstractJobScheduleStandaloneHandler {
@Override
public void execute() {
}
}
调用逻辑如下:
MyStandaloneScheduleHandler scheduleHandler2 = new MyStandaloneScheduleHandler();
// ②
scheduleHandler2.executeOneway(new JobScheduleProducerListener<String>() {
@Override
public List<String> produce(Object arg) {
final List<String> strings2 = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
String str = "executeOneway---snowalker-----" + i;
strings2.add(str);
}
return strings2;
}
}, new JobScheduleConsumerListener<String>() {
@Override
public Object consume(String s) {
LOGGER.info("executeOneway---consuming start!!!! s = " + s);
return null;
}
});
调用MyStandaloneScheduleHandler实例的executeOneway方法,将生产监听器JobScheduleProducerListener实例,消费监听器JobScheduleConsumerListener实例传入。对②处代码的理解就很直观:通过executeOneway发起业务调度,将JobScheduleProducerListener生产的列表在JobScheduleConsumerListener中进行consume,具体的迭代同样封装在了AbstractJobScheduleStandaloneHandler内部。
springboot调度实战
上文中讲解了如何调用单机模式的shield-job,这里通过springboot的定时任务写一个实例,增强理解。
demo基于 自己写分布式调度组件shield-job之使用shield-job 中的代码。
代码如下:
@Component
public class OrderInfoJobProducerStandalone extends AbstractJobScheduleStandaloneHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderInfoJobProducerStandalone.class);
private JobProducerExecutor jobProducerExecutor;
@PostConstruct
public void init() {
LOGGER.info("单机模式调度执行初始化......");
}
@Scheduled(cron = "${order.standalone.cron}")
public void execute() {
executeOneway(new JobScheduleProducerListener<String>() {
@Override
public List<String> produce(Object arg) {
LOGGER.info("单机模式作业生产开始...");
final List<String> strings2 = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
String str = "executeOneway---snowalker-----" + i;
strings2.add(str);
}
LOGGER.info("单机模式作业生产结束...");
return strings2;
}
}, new JobScheduleConsumerListener<String>() {
@Override
public Object consume(String s) {
LOGGER.info("executeOneway---consuming start!!!! s = " + s);
return null;
}
});
}
}
配置的调度cron表达式为:0/3 ?
从0s开始,每隔扫描运行一次。
运行程序日志打印如下:
2019-04-16 14:09:24.001 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [40]
- 单机模式作业生产开始...
2019-04-16 14:09:24.001 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [46]
- 单机模式作业生产结束...
2019-04-16 14:09:24.001 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----0
2019-04-16 14:09:24.001 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----1
2019-04-16 14:09:24.001 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----2
2019-04-16 14:09:24.001 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----3
2019-04-16 14:09:24.002 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----4
2019-04-16 14:09:24.002 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----5
2019-04-16 14:09:24.002 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----6
2019-04-16 14:09:24.002 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----7
2019-04-16 14:09:24.002 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----8
2019-04-16 14:09:24.002 [scheduling-1] INFO c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
- executeOneway---consuming start!!!! s = executeOneway---snowalker-----9
小结
到此就完成了对纯单机模式下调度的抽象和封装的讲解,这种方式对于没有RocketMQ运维能力的项目来讲比较适用。