文章目录
  1. 1. 调用方式1:自行实现execute方法
  2. 2. 调用方式2:executeOneway方式
  3. 3. springboot调度实战
  4. 4. 小结

考虑到部分业务场景下没有使用RocketMQ,因此我在之前的基础上添加了对单机模式调度的封装。

本文就单机模式下,不依赖RocketMQ如何使用shield-job进行定时任务调度展开讲解。

目前有两种主要的调用方式,客户端实现抽象调度方法 execute ,或者直接使用内置的 executeOneway 方法进行调用,接下来分别介绍两种方式。

调用方式1:自行实现execute方法

首先定义单机调度处理器,继承抽象类AbstractJobScheduleStandaloneHandler,需要实现execute()方法。

public class MyStandaloneScheduleHandler extends AbstractJobScheduleStandaloneHandler {

        @Override
        public void execute() {
            // 生产
            List<String> list = super.produce(new JobScheduleProducerListener<String>() {
                @Override
                public List<String> produce(Object arg) {
                    final List<String> strings2 = new ArrayList<>(10);
                    for (int i = 0; i < 10; i++) {
                        String str = "snowalker-----" + i;
                        strings2.add(str);
                    }
                    return strings2;
                }
            }, null);

            // 消费
            super.consume(new JobScheduleConsumerListener<String>() {
                @Override
                public Object consume(String s) {
                    LOGGER.info("consuming start!!!! s = " + s);
                    return null;
                }
            }, list);
        }
    }

在实现类的execute()中,分别调用produce和consume方法,实现生产和消费逻辑。

其中,在 produce(JobScheduleProducerListener jobScheduleStandaloneListener, Object arg) 方法中实现生产逻辑,客户端需要传入JobScheduleProducerListener的实例,调用方便的情况下可以直接使用匿名内部类或者lambda表达式,将需要进行调度的列表生产出来,具体的生产过程业务方需要关心。

生产完成后,将列表返回,并传入 consume(JobScheduleConsumerListener jobScheduleConsumerListener, List list) 方法的第二个参数中,第一个参数为JobScheduleConsumerListener的实例,客户端需要实现消费逻辑,这里的逻辑是单个元素的消费逻辑,不需要再进行迭代了,这样设计的目的是保证集合和单个实体的消费逻辑的统一性。

写个测试用例测一下,日志输入如下。

13:46:56.528 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----0
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----1
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----2
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----3
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----4
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----5
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----6
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----7
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----8
13:46:56.532 [main] INFO COMMON-APPENDER - consuming start!!!! s = snowalker-----9

调用方式2:executeOneway方式

如果业务逻辑简单,推荐使用executeOneway方式,自描述性更强,调用方式如下

首先定义业务的调度实现类,execute()使用空实现即可。

public class MyStandaloneScheduleHandler2 extends AbstractJobScheduleStandaloneHandler {

    @Override
    public void execute() {
    }
}

调用逻辑如下:

MyStandaloneScheduleHandler scheduleHandler2 = new MyStandaloneScheduleHandler();
// ②
scheduleHandler2.executeOneway(new JobScheduleProducerListener<String>() {
    @Override
    public List<String> produce(Object arg) {
        final List<String> strings2 = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            String str = "executeOneway---snowalker-----" + i;
            strings2.add(str);
        }
        return strings2;
    }
}, new JobScheduleConsumerListener<String>() {
    @Override
    public Object consume(String s) {
        LOGGER.info("executeOneway---consuming start!!!! s = " + s);
        return null;
    }
});

调用MyStandaloneScheduleHandler实例的executeOneway方法,将生产监听器JobScheduleProducerListener实例,消费监听器JobScheduleConsumerListener实例传入。对②处代码的理解就很直观:通过executeOneway发起业务调度,将JobScheduleProducerListener生产的列表在JobScheduleConsumerListener中进行consume,具体的迭代同样封装在了AbstractJobScheduleStandaloneHandler内部。

springboot调度实战

上文中讲解了如何调用单机模式的shield-job,这里通过springboot的定时任务写一个实例,增强理解。

demo基于 自己写分布式调度组件shield-job之使用shield-job 中的代码。

代码如下:

@Component
public class OrderInfoJobProducerStandalone extends AbstractJobScheduleStandaloneHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderInfoJobProducerStandalone.class);

    private JobProducerExecutor jobProducerExecutor;

    @PostConstruct
    public void init() {
        LOGGER.info("单机模式调度执行初始化......");
    }

    @Scheduled(cron = "${order.standalone.cron}")
    public void execute() {
        executeOneway(new JobScheduleProducerListener<String>() {
            @Override
            public List<String> produce(Object arg) {
                LOGGER.info("单机模式作业生产开始...");
                final List<String> strings2 = new ArrayList<>(10);
                for (int i = 0; i < 10; i++) {
                    String str = "executeOneway---snowalker-----" + i;
                    strings2.add(str);
                }
                LOGGER.info("单机模式作业生产结束...");
                return strings2;
            }
        }, new JobScheduleConsumerListener<String>() {
            @Override
            public Object consume(String s) {
                LOGGER.info("executeOneway---consuming start!!!! s = " + s);
                return null;
            }
        });
    }
}

配置的调度cron表达式为:0/3 ?

从0s开始,每隔扫描运行一次。

运行程序日志打印如下:

2019-04-16 14:09:24.001 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [40] 
    - 单机模式作业生产开始...
2019-04-16 14:09:24.001 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [46] 
    - 单机模式作业生产结束...
2019-04-16 14:09:24.001 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----0
2019-04-16 14:09:24.001 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----1
2019-04-16 14:09:24.001 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----2
2019-04-16 14:09:24.001 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52]
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----3
2019-04-16 14:09:24.002 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----4
2019-04-16 14:09:24.002 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----5
2019-04-16 14:09:24.002 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----6
2019-04-16 14:09:24.002 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----7
2019-04-16 14:09:24.002 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----8
2019-04-16 14:09:24.002 [scheduling-1] INFO  c.s.s.j.p.s.OrderInfoJobProducerStandalone [52] 
    - executeOneway---consuming start!!!! s = executeOneway---snowalker-----9

小结

到此就完成了对纯单机模式下调度的抽象和封装的讲解,这种方式对于没有RocketMQ运维能力的项目来讲比较适用。

文章目录
  1. 1. 调用方式1:自行实现execute方法
  2. 2. 调用方式2:executeOneway方式
  3. 3. springboot调度实战
  4. 4. 小结
Fork me on GitHub