文章目录
  1. 1. 聊聊Callable
  2. 2. 聊聊Future
    1. 2.1. 趁热乎,再看看FutureTask
  3. 3. 如何在实战中应用异步任务并发执行?
    1. 3.1. 首先需要一个异步线程池
    2. 3.2. 定义准备工作(清洗茶壶、清洗茶杯、准备茶叶)Callable实现类
    3. 3.3. 定义烧开水Callable实现类
    4. 3.4. 通过线程池+callable+future执行异步任务
    5. 3.5. 思考:如果并行任务执行的时间不一样长会怎样?
    6. 3.6. 思考:多任务并发提交,如何提高运行效率?
  4. 4. 思考:如果存在大量相同任务,如何使用并发任务提交来提升执行效率?
    1. 4.1. 场景定义:多人同时洗杯子
      1. 4.1.1. 定义洗杯子Callable实现类
      2. 4.1.2. 编写测试代码
      3. 4.1.3. 多人同时洗杯子运行结果
        1. 4.1.3.1. 单个工人串行清洗杯子
        2. 4.1.3.2. 多个工人并发清洗杯子
    2. 4.2. 注意:一种错误的使用方式
  5. 5. 反思及总结
  6. 6. 下期预告

分布式系统开发过程中,我们常常会遇到同时访问多个远程接口,然后在自己的业务逻辑内聚合多个接口的调用结果。这个过程,类比到现实中很像“烧水泡茶”。

在正式讲技术实现之前,我们先以“烧水泡茶”为例,讨论一下并发任务执行在现实中是如何进行的。

烧水泡茶,往往分为如下几步:

  1. 清洗热水壶 (3min)
  2. 烧开水 (8min)
  3. 清洗茶壶 (2min)
  4. 清洗茶杯 (5min)
  5. 准备茶叶 (1min)
  6. 泡茶 (3min)

这个例子,源自著名数学家华罗庚的《统筹方法》一文,先生试图通过这样一个例子,为我们讲解如何进行统筹,组合不同的步骤,能够在最短的时间内喝到茶水。

那么我们就试着分析一下这个案例,如果我们不加优化,直接通过串行方式进行泡茶工序,那么具体的执行路径就如下图所示:

sync-tea.png

也就是说,我们进行“烧水泡茶”的过程是完全串行操作的,假设每个步骤之间是无缝的,那么最终花费的时间 T0 = 3 + 8 + 2 + 5 + 1 + 3 = 22min

我们发现,烧开水的时间很长,在这个时间内完全可以去做别的事情,也就是烧开水的过程可以与其他的步骤并行执行。因为必须要把热水壶洗干净才能烧开水,毕竟卫生是第一位的嘛。也就是说第一步“清洗热水壶”需要单独进行。我们尝试对上面的串行流程稍作统筹优化,得到优化后的泡茶流程。

async-tea.png

我们发现,烧开水的时间 (8min)= 清洗茶壶 (2min) + 清洗茶杯 (5min) + 准备茶叶 (1min),也就是在等待水烧开的空档里,我们可以按顺序去执行剩余的准备步骤,最后等茶具和开水都准备到位,就可以开始泡茶,然后享受美好的品茗时光了。

改进后的流程,最终花费的时间 T1 = 3 + 8 + 3 = 14min。相比完全串行的方式,节省了8分钟。毕竟时间就是金钱,能够提前八分钟喝到美味的茗茶,想想也是一件美好的事情。

从这个例子能够看出,合理的安排不同操作步骤的执行顺序和关系,让没有强关联的步骤能够并发执行,尽量减少不同步骤执行的串行程度,能够在一定程度上达到提升业务逻辑执行效率的目的。

回到技术问题上来,在Java的分布式开发领域,我们常常通过Future + Callable来并发执行任务,提高业务操作的并发度。

聊聊Callable

首先聊聊Callable。

Callable位于java.util.concurrent包下,它是一个接口,只声明了一个方法,方法名为call(),简明扼要:

Callable的声明如下:

public interface Callable<V> {
    /**
    * Computes a result, or throws an exception if unable to do so.
    *
    * @return computed result
    * @throws Exception if unable to compute a result
    */
    V call() throws Exception;
}

通过注释和接口声明,可以看出Callable是一个泛型接口,并且call()函数返回的类型就是传递进来的泛型V。

Callable的使用通常需要借助线程池ExecutorService;在ExecutorService中有多个submit的重载方法,其中就有接受Callable实例的方法:

<T> Future<T> submit(Callable<T> task);

聊聊Future

了解了Callable之后,我们再了解一下Future。在上文中,我们已经能够通过ExecutorService的submit重载方法看出端倪,submit方法的返回值为一个Future带泛型T。

Future类同样位于java.util.concurrent(再次夸一下,道格李大叔真的优秀)包下,是一个接口:

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

为了展示方便我去除了注释。

一句话概括Future:我们通过Future能够实现对具体Runnable或者Callable任务的执行结果进行取消(cancel)、查询是否完成(isDone)、获取结果等操作(get)。

当我们需要对一个异步任务获取其返回结果时,可以通过get方法进行操作,get方法支持指定超时时间,当达到超时时间,则会抛出TimeoutException,因此对于超时时间需要谨慎指定,重要的是,该方法会阻塞直到任务返回结果。

我们简单对Future的几个方法的作用进行介绍,方便读者进行理解:

 在Future接口中声明了5个方法,下面依次解释每个方法的作用:

  1. cancel方法用来取消任务;当任务取消成功则返回true,任务取消失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行中且没有执行完毕的任务;

    如果设置true,则表示可以取消正在执行过程中的任务。

    如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;

    如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;

    如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

  2. isCancelled方法表示任务是否被取消成功;如果在任务正常完成前被取消成功,则返回 true;
  3. isDone方法表示任务是否已经完成,若任务完成,则返回true;
  4. get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  5. get(long timeout, TimeUnit unit)用来获取执行结果,它允许指定超时时间;

趁热乎,再看看FutureTask

日常开发中,我们发现还有个类经常出现在我们的视线中,它就是FutureTask。

我们看看FutureTask的声明就知道它具体是干什么用了。

public class FutureTask<V> implements RunnableFuture<V>

我们发现FutureTask实现了接口RunnableFuture,我们再顺着看RunnableFuture声明:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
    * Sets this Future to the result of its computation
    * unless it has been cancelled.
    */
    void run();
}

我们发现,最终RunnableFuture本质上是Runnable与Future的子类,也就是说它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

根据里氏替换原则,可以认为FutureTask同时具备Runnable与Future的特性。

毫不夸张的说,FutureTask是事实上的Future接口的唯一实现类。

我们看一下FutureTask的构造器:

public FutureTask(Callable<V> callable) {
}

public FutureTask(Runnable runnable, V result) {
}

它提供了两个构造器,能够接收Callable or Runnable的实例。

用一句话概括,实战中,我们可以使用FutureTask替代Runnable与Future。

如何在实战中应用异步任务并发执行?

说了这么多,具体应该怎么用呢?

我们还是基于开头提到的“烧水泡茶”例子,用代码方式直观的展示,在实战中,面对多个任务并发执行的场景,应该如何去做。

首先需要一个异步线程池

根据理论部分的分析,我们得知,执行异步任务需要一个线程池,这里我们定义一个ThreadPoolExecutor

private ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            2 * Runtime.getRuntime().availableProcessors() + 1,
            5000,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new ThreadPoolExecutor.AbortPolicy()
        );

再看一次并发泡茶的流程图,我们知道,烧开水、清洗茶壶、清洗茶杯、准备茶叶 这几步是能够并发执行的,我们接着定义对应的任务执行Callable实现类。

async-tea.png

定义准备工作(清洗茶壶、清洗茶杯、准备茶叶)Callable实现类

public class PrepareTaskCallableTask implements Callable<List<String>> {

    @Override
    public List<String> call() throws Exception {
        List<String> prepareTasks = new ArrayList<>(3);

        // 清洗茶杯
        String cleanTeapot = cleanTeapot();
        // 清洗茶壶
        String cleanTeacup = cleanTeacup();
        // 准备茶叶
        String prepareTea = prepareTea();

        prepareTasks.add(cleanTeapot);
        prepareTasks.add(cleanTeacup);
        prepareTasks.add(prepareTea);

        return prepareTasks;
    }

    /**
    * 清洗茶壶
    * @return
    */
    public String cleanTeapot() {
        try {
            // 执行清洗茶壶任务
            System.out.println(Thread.currentThread().getName() + "-执行[清洗茶壶]任务开始..........");
            // 清洗茶壶二分钟
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + "-执行[清洗茶壶]任务结束..........");
            return "清洗茶壶共使用2分钟";
        } catch (Exception e) {
            return null;
        }
    }

    /**
    * 清洗茶杯
    * @return
    */
    public String cleanTeacup() {
        try {
            // 执行清洗茶杯任务
            System.out.println(Thread.currentThread().getName() + "-执行[清洗茶杯]任务开始..........");
            // 清洗茶杯五分钟
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + "-执行[清洗茶杯]任务结束..........");
            return "清洗茶杯共使用5分钟";
        } catch (Exception e) {
            return null;
        }
    }

    /**
    * 准备茶叶
    * @return
    */
    public String prepareTea() {
        try {
            // 执行准备茶叶任务
            System.out.println(Thread.currentThread().getName() + "-执行[准备茶叶]任务开始..........");
            // 准备茶叶一分钟
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + "-执行[准备茶叶]任务结束..........");
            return "清洗茶壶共使用1分钟";
        } catch (Exception e) {
            return null;
        }
    }
}

定义一个CleanTeapotCallableTask类,实现接口Callable,我们定义泛型为List<String>用以说明问题,将串行步骤执行结果封装到List中。

我们通过Thread.sleep对应的时长,借以模拟对应的步骤。

实战中,泛型类型往往是远程的对象引用实例,我们只需要指定类型为对应的对象引用实例即可。

定义烧开水Callable实现类

public class BoilWaterCallableTask implements Callable<String> {

    @Override
    public String call() throws Exception {
        // 执行烧水任务
        System.out.println(Thread.currentThread().getName() + "-执行[烧水任务]开始..........");
        // 烧水八分钟
        Thread.sleep(8000);
        System.out.println(Thread.currentThread().getName() + "-执行[烧水任务]结束..........");
        return "烧开水共使用8分钟";
    }
}

定义烧开水对应的异步任务Callable实现类,设置烧开水用时共需8分钟,并返回烧开水结果字符串。

通过线程池+callable+future执行异步任务

原材料已经准备完毕,接着就是我们的重头戏,通过线程池+callable+future执行异步任务。

直接看代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    long startTime = System.currentTimeMillis();
    // 提交 准备工作任务(清洗茶壶+清洗茶杯+准备茶叶) 到线程池
    PrepareTaskCallableTask prepareTaskCallableTask = new PrepareTaskCallableTask();
    Future<List<String>> prepareTaskFuture = executorService.submit(prepareTaskCallableTask);
    // 提交 烧开水任务 到线程池
    BoilWaterCallableTask boilWaterCallableTask = new BoilWaterCallableTask();
    Future<String> boilWaterFuture = executorService.submit(boilWaterCallableTask);

    // 并行任务结果获取
    List<String> prepareResultList = prepareTaskFuture.get();
    String boilWaterResult = boilWaterFuture.get();
    long finishedTime = System.currentTimeMillis();

    long taskTime = finishedTime - startTime;
    System.out.println("共耗时:" + taskTime + "毫秒");
    System.out.println(prepareResultList);

    executorService.shutdown();
}

代码分析如下:

  1. 我们通过自定义的线程池并发提交了准备工作任务、烧开水任务到线程池;
  2. 通过对应任务提交返回的Future对象引用的get()方法并行获取任务结果
  3. 最后打印任务执行时间与任务执行结果

运行结果如下:

pool-1-thread-1-执行[清洗茶壶]任务开始..........
pool-1-thread-2-执行[烧水任务]开始..........
pool-1-thread-1-执行[清洗茶壶]任务结束..........
pool-1-thread-1-执行[清洗茶杯]任务开始..........
pool-1-thread-2-执行[烧水任务]结束..........
pool-1-thread-1-执行[清洗茶杯]任务结束..........
pool-1-thread-1-执行[准备茶叶]任务开始..........
pool-1-thread-1-执行[准备茶叶]任务结束..........
共耗时:8002毫秒
[清洗茶壶共使用2分钟, 清洗茶杯共使用5分钟, 清洗茶壶共使用1分钟]
烧开水共使用8分钟

可以看到两个任务通过并发执行的方式,提升了执行效率。

思考:如果并行任务执行的时间不一样长会怎样?

上文中的“烧水泡茶”例子中,烧开水的时间 (8min)= 清洗茶壶 (2min) + 清洗茶杯 (5min) + 准备茶叶 (1min),也就是两个并行任务执行的时间刚好相同,

但是实际开发中,我们面临的往往是不同任务的并发执行,他们各自的执行时长也往往各不相同,放在这个例子中,如果准备工作中任意一个耽误了时间,那么烧开水的时间就小于准备时间。比如说,一下子来了很多客人,茶杯需要准备多准备几个,等开水烧好了,茶杯还是没有清洗干净。此时,流程图变成了这个样子:

async-tea2.png

假设清洗茶杯的时间增加到了7分钟,则 准备时间 = 清洗茶壶 (2min) + 清洗茶杯 (7min) + 准备茶叶 (1min) = 10min 那么修改一下代码,再次执行,可以看到运行结果变成了:

pool-1-thread-1-执行[清洗茶壶]任务开始..........
pool-1-thread-2-执行[烧水任务]开始..........
pool-1-thread-1-执行[清洗茶壶]任务结束..........
pool-1-thread-1-执行[清洗茶杯]任务开始..........
pool-1-thread-2-执行[烧水任务]结束..........
pool-1-thread-1-执行[清洗茶杯]任务结束..........
pool-1-thread-1-执行[准备茶叶]任务开始..........
pool-1-thread-1-执行[准备茶叶]任务结束..........
共耗时:10002毫秒
[清洗茶壶共使用2分钟, 清洗茶杯共使用7分钟, 清洗茶壶共使用1分钟]
烧开水共使用8分钟

思考:多任务并发提交,如何提高运行效率?

总执行时长变为10分钟,那么我们就可以得出一个结论:

当存在多个并发任务执行时,最终消耗的时间为并发任务中执行时间最长的那个任务所花费的时间。

这个结论理解起来也很容易,多个任务并发执行,别的任务都执行完了,就剩下那个执行最慢的任务了,当最慢的任务执行完成的时候,全部任务也就执行完成了。有点类似“木桶原理”,木桶能够承载的水量取决于最短的那条木板的长度,对应到我们的并发任务执行场景中来就是:要缩短并发任务执行的总时长,需要优先考虑优化执行最耗时的那个任务所耗费的时间

这也是我们这一小节题目的答案。

对串行流程采用多任务并发提交这个操作本身就已经大幅度提升了代码逻辑的执行效率,进一步优化,我们应当优先关注多个任务中最耗时的那一个任务,对其执行优化,能够显著提升整体任务的执行效率,缩短执行耗时。

思考:如果存在大量相同任务,如何使用并发任务提交来提升执行效率?

上文中提到的场景,执行步骤能够穷举出来。

可是现实总是多变的,我们还是以本文的“烧水泡茶”为例,以清洗茶杯流程,假设我们洗一个茶杯要10秒,洗10个就是100秒,一个人能够做的过来,假设我们一次要洗1000个杯子,让一个人洗,需要耗费10000s,也就是约等于167分钟,接近三个小时。想想也是一个恐怖的事情,假设这是一个大型的宴会,等到1000个杯子洗完,宴会都快结束了,宾客们还是没能喝上酒水。严重影响用户体验。

这种场景,在代码开发中,就是通过循环调用,在一个线程中做大量工作,比如批量对账场景,在一个线程中对所有的商家执行对账,任务纯串行化,并发度完全没有,如果商家数量有几百万,耗时需要几个小时,效率很低。

这个时候,作为一个有追求的开发者,我们会下意识的想,我们是不是可以基于多线程并发任务提交,来并行执行这种大批量、重复度高的任务呢?

当然可以,就使用上文中的 线程池 + future + callable 就能够达到目的。

场景定义:多人同时洗杯子

我们就以多人洗杯子 这个业务场景进行案例实战模拟,为了缩短案例运行时间(本质上和现实没有区别),我们假设有1000个杯子,10个人洗,每人清洗100个杯子,每个杯子清洗时间平均耗时1秒,我们用并发任务批量提交的方式看一下相比于单人串行方式有多大的性能提升。

定义洗杯子Callable实现类

我们还是编写一个洗杯子的Callable实现类:

public class CleanTeacupCallableTask implements Callable<String> {

    /**工人编号*/
    private String id;
    /**需要清洗的茶杯数量*/
    private int amount;

    public CleanTeacupCallableTask(String id, int amount) {
        this.id = id;
        this.amount = amount;
    }

    @Override
    public String call() throws Exception {
        try {
            if (amount <= 0) {
                return null;
            }
            // 执行清洗茶杯任务
            System.out.println(Thread.currentThread().getName() + "-执行[清洗茶杯]任务开始.........." + "数量:" + amount);

            long totalTimeMills = 0L;
            for (int i = 0; i < amount; i++) {
                totalTimeMills = amount * 1000;
            }
            Thread.sleep(totalTimeMills);
            System.out.println(Thread.currentThread().getName() + "-执行[清洗茶杯]任务结束.........." + "数量:" + amount);
            return id + "-清洗茶杯共使用" + totalTimeMills / 1000 + "秒";
        } catch (Exception e) {
            return null;
        }
    }
}

我们计划为每个工人分配一个茶杯清洗任务CleanTeacupCallableTask,它的构造方法接收两个参数: 工人编号、需要清洗的茶杯数量。

每个工人需要花费的清洗总时长 = 单个茶杯清洗耗时 * 需要清洗的茶杯数量

编写测试代码

我们接着编写测试代码,通过线程池批量提交茶杯并发清洗任务。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    long startTime = System.currentTimeMillis();

    // 杯子共1000个
    int cupAmount = 1000;
    // 共10个工人
    int workerAmount = 10;
    // 每人清洗数量
    int cleanCupsPerWorker = cupAmount / workerAmount;

    // 定义Future列表
    List<Future<String>> futures = new ArrayList<>(workerAmount);
    for (int i = 0; i < workerAmount; i++) {
        CleanTeacupCallableTask cleanTeacupCallableTask = new CleanTeacupCallableTask("worker-" + i, cleanCupsPerWorker);
        Future<String> cleanTeacupFuture = executorService.submit(cleanTeacupCallableTask);
        // future对象引用添加到futures列表
        futures.add(cleanTeacupFuture);
    }

    // 返回结果列表
    List<String> resultList = new ArrayList<>(workerAmount);
    futures.stream().forEach(cleanTeacupFuture -> {
        // 尝试获取远端结果
        String result = null;
        try {
            result = cleanTeacupFuture.get();
        } catch (Exception e) {
            result = null;
        }
        resultList.add(result);
    });

    // 遍历结果列表
    for (int i = 0; i < resultList.size(); i++) {
        System.out.println(resultList.get(i));
    }

    long finishedTime = System.currentTimeMillis();
    long taskTime = finishedTime - startTime;
    System.out.println("共耗时:" + taskTime + "毫秒");

    executorService.shutdown();
}

代码注释也比较详细,我们主要做了以下几件事情:

  1. 定义了待清洗的茶杯总数、清理工人数量,并计算得出每个工人需要清洗的茶杯数量;
  2. 为每个工人提交了一个茶杯清洗异步任务,并将返回的future结果封装到一个List中;
  3. 遍历future列表,尝试获取远端的返回值,并将返回值封装到一个List中;
  4. 打印结果并计算耗时

多人同时洗杯子运行结果

pool-1-thread-1-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-5-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-4-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-3-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-2-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-7-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-6-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-9-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-8-执行[清洗茶杯]任务开始..........数量:100
pool-1-thread-10-执行[清洗茶杯]任务开始..........数量:100


pool-1-thread-8-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-3-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-4-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-9-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-5-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-2-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-6-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-7-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-1-执行[清洗茶杯]任务结束..........数量:100
pool-1-thread-10-执行[清洗茶杯]任务结束..........数量:100

worker-0-清洗茶杯共使用100秒
worker-1-清洗茶杯共使用100秒
worker-2-清洗茶杯共使用100秒
worker-3-清洗茶杯共使用100秒
worker-4-清洗茶杯共使用100秒
worker-5-清洗茶杯共使用100秒
worker-6-清洗茶杯共使用100秒
worker-7-清洗茶杯共使用100秒
worker-8-清洗茶杯共使用100秒
worker-9-清洗茶杯共使用100秒

共耗时:100002毫秒

可以看到,我们为10个工人分配了10个线程并发提交了10个茶杯清洗任务,10个工人同时开始了各自的茶杯清洗工作,最终当所有工人清洗完茶杯之后,共耗时100秒(PS:笔者使用的笔记本安装一颗8核心16线程CPU,能够同时开启16线程执行并行任务)。

这符合我们最初做的假设,即对于1000个杯子,10个工人并行清洗,每个杯子清洗平均耗时1s,只需要花费100秒;相同的工作由单个工人执行,则需要结结实实花费1000秒。通过并行方式,将任务执行效率提升了整整10倍,这是很客观的性能提升。

用图形来表示是这样的:

单个工人串行清洗杯子

single-clean-teacup.png

多个工人并发清洗杯子

multi-clean-teacup.png

注意:一种错误的使用方式

上文中,我们通过“烧水泡茶”、“多人清洗杯子”等案例,讲解了多步骤并发任务提交 以及 批量任务并发提交 的思路及代码实现,均达到了目的。

但是实际开发中,我们发现有的同学初衷是想通过并发任务提交方式提升系统执行效率,但由于学艺不精或者一时糊涂,使用了错误的方式,将并发代码最终通过串行方式运行了,并没能实现通过多线程方式提升逻辑运行效率的目的。我们此处就看一下常见的错误使用方式是如何“巧夺天工”地将代码串行化的,我们还是以“烧水泡茶”案例来说明。

具体的任务编写代码没有什么问题,问题出在并发任务提交的逻辑编写上,错误使用方式的代码是这么写的:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    long startTime = System.currentTimeMillis();

    // 提交 准备工作任务(清洗茶壶+清洗茶杯+准备茶叶) 到线程池
    PrepareTaskCallableTask prepareTaskCallableTask = new PrepareTaskCallableTask();
    Future<List<String>> prepareTaskFuture = executorService.submit(prepareTaskCallableTask);
    List<String> prepareResultList = prepareTaskFuture.get();
    // 提交 烧开水任务 到线程池
    BoilWaterCallableTask boilWaterCallableTask = new BoilWaterCallableTask();
    Future<String> boilWaterFuture = executorService.submit(boilWaterCallableTask);
    String boilWaterResult = boilWaterFuture.get();

    long finishedTime = System.currentTimeMillis();

    long taskTime = finishedTime - startTime;
    System.out.println("共耗时:" + taskTime + "毫秒");
    System.out.println(prepareResultList);
    System.out.println(boilWaterResult);

    executorService.shutdown();
}

对比正确的写法,有没有发现问题出在哪儿?

相信聪明的你已经看出来了,错误的写法是针对每个任务的提交结果分别进行了get()操作,当A任务提交之后就通过get()尝试获取结果,阻塞了主线程;当A任务执行结果获取到之后再提交B任务,并通过get()尝试获取结果,最终执行的结果如下:

pool-1-thread-1-执行[清洗茶壶]任务开始..........
pool-1-thread-1-执行[清洗茶壶]任务结束..........
pool-1-thread-1-执行[清洗茶杯]任务开始..........
pool-1-thread-1-执行[清洗茶杯]任务结束..........
pool-1-thread-1-执行[准备茶叶]任务开始..........
pool-1-thread-1-执行[准备茶叶]任务结束..........
pool-1-thread-2-执行[烧水任务]开始..........
pool-1-thread-2-执行[烧水任务]结束..........
共耗时:16003毫秒
[清洗茶壶共使用2分钟, 清洗茶杯共使用5分钟, 清洗茶壶共使用1分钟]
烧开水共使用8分钟

可以看到,两个任务执行时长 = 准备工作8分钟 + 烧开水8分钟 = 16分钟!

也就是说,这种错误的使用方式,将并发任务本质上写成了串行任务!如果是对实效性要求很高的业务场景,后果是很严重的!尤其是实时对账、实时多接口查询,将大幅度提升系统的单次调用执行时长,严重影响接口RT,如果是2C业务,将直接影响到用户体验。

反思及总结

本文篇幅较长,内容虽看似简单,但充斥着玄机。

我们通过一个“烧水泡茶”的例子引出并发任务提交的场景,并展示了代码具体如何编写。

随后,基于批量任务执行的场景,又给出了对应的代码案例,对于离线任务批处理,在线实时批量调用,通过线程池异步并发任务批量执行的方式能够大幅度提升业务执行效率。

文章最后,我们给出了一个并发任务编写的“bad case”,这警示我们:对于技术细节,要尽量知其然,知其所以然。只知道皮毛很容易用错,而对于多线程的技术点,一旦用错,极易引发严重后果。

文章中提供了大量的图例和运行结果展示,希望能够通过笔者笨拙的描述,对你在实战中运用 线程池+future+callable 异步任务执行逻辑的编写有所帮助。

下期预告

你以为到这里就结束了?

熟悉笔者风格的同学马上就意识到,问题没有这么简单。

在Java8中,更新了新的异步任务执行框架–CompletableFuture,它提供了丰富的并发任务组合提交、回调等特性。

在这个技术日益内卷的时代,对底层干饭型程序员,唯有学习才能破局。那么我们下期就通过CompletableFuture继续讲解更多的异步任务并发执行的姿势,让我们不见不散。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 聊聊Callable
  2. 2. 聊聊Future
    1. 2.1. 趁热乎,再看看FutureTask
  3. 3. 如何在实战中应用异步任务并发执行?
    1. 3.1. 首先需要一个异步线程池
    2. 3.2. 定义准备工作(清洗茶壶、清洗茶杯、准备茶叶)Callable实现类
    3. 3.3. 定义烧开水Callable实现类
    4. 3.4. 通过线程池+callable+future执行异步任务
    5. 3.5. 思考:如果并行任务执行的时间不一样长会怎样?
    6. 3.6. 思考:多任务并发提交,如何提高运行效率?
  4. 4. 思考:如果存在大量相同任务,如何使用并发任务提交来提升执行效率?
    1. 4.1. 场景定义:多人同时洗杯子
      1. 4.1.1. 定义洗杯子Callable实现类
      2. 4.1.2. 编写测试代码
      3. 4.1.3. 多人同时洗杯子运行结果
        1. 4.1.3.1. 单个工人串行清洗杯子
        2. 4.1.3.2. 多个工人并发清洗杯子
    2. 4.2. 注意:一种错误的使用方式
  5. 5. 反思及总结
  6. 6. 下期预告
Fork me on GitHub