很多小伙伴经常会遇到这样的一个场景,在系统中有很多对其他模块的远程调用,通常一个操作需要调用多个外部接口才能完成。起初系统采用了同步顺序调用多个外部接口的方式。这样总的耗时为 n1+n2+n3+... 。后续我们考虑是否可以采用异步+回调的方式,让多个请求同时访问。那么到底怎么搞才是比较秀的呢。

技术选型

一、伪异步 -- Future

创建线程的方式只有两种:继承Thread或者实现Runnable接口。 但是这两种方法都存在一个缺陷,没有返回值,也就是说我们无法得知线程执行结果。虽然简单场景下已经满足,但是当我们需要返回值的时候怎么办呢? Java 1.5 以后的Callable和Future接口就解决了这个问题,我们可以通过向线程池提交一个Callable来获取一个包含返回值的Future对象

当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值。但是Future的get()方法会阻塞主线程。

这种场景下返回的 Future 的不足之处:

•只有主动调用 get 方法去获取值,但是有可能值还没准备好,就阻塞等待。

•任务处理过程中出现异常会把异常隐藏,封装到 Future 里面去,只有调用 get 方法的时候才知道异常了。

二、进阶版 -- Guava异步

Guava的ListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。

其实是回调(callback):在异步任务提交之后,注册一个回调函数。

1、addListener

Guava包里面对 JDK 的 Future 进行了扩展:新增了 addListener 的方法。

创建线程池的方法也有所改变,用Guava的 MoreExecutors 装饰了一下。

调用 ListeningExecutorService 的submit 方法会返回 ListenableFuture,使用 addListener 就可以注册回调方法。

获取运行结果是另外的线程执行的,完全没有阻塞主线程

2、FutureCallback

有onSuccess 和 onFailure 两个回调方法,使用方法如下:


Guava 的 ListenableFuture 已经可以处理大部分的异步任务场景了,但是不够强大,并不能很好地完成以下我们所需的业务

•将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果

•等待Future集合种的所有任务都完成。

•仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。

三、终极版 -- CompletableFuture

在Java 8 中, 新增加了一个类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture 实现了两个接口,一个是我们熟悉的 Future ,一个是 CompletionStage。CompletionStage可以用于构建复杂的异步计算流水线,通过串行、并行、组合等方式来处理异步计算的结果。

CompletableFuture使用

方法命名规则

•带有Async后缀方法都是异步另外线程执行,没有就是复用之前任务的线程

•带有Apply标识方法都是可以获取返回值+有返回值的

•带有Accept标识方法都是可以获取返回值

•带有run标识的方法不可以获取返回值和无返回值,只是运行

get方法和join方法

•join:阻塞获取结果或抛出非受检异常。

•get: 阻塞获取结果或抛出受检测异常,需要显示进行try...catch处理

实例化

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

//或者可以通过一个简单的无参构造器
CompletableFuture<Object> completableFuture = new CompletableFuture<Object>();

注意:在实例化方法中,我们是可以指定Executor参数的,当我们不指定的试话,我们所开的并行线程使用的是默认系统及公共线程池ForkJoinPool,而且这些线程都是守护线程。我们在编程的时候需要谨慎使用守护线程,如果将我们普通的用户线程设置成守护线程,当我们的程序主线程结束,JVM中不存在其余用户线程,那么CompletableFuture的守护线程会直接退出,造成任务无法完成的问题。

CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

//handle方法集和上面的complete方法集,区别就在于返回值,handle方法是可以自定义的
<U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
<U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
<U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

//apply方法集和handle方法集,唯一的不同是,handle方法会给出异常,可以让用户自己在内部处理,而apply方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理
<U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

//accept 方法集只做最终结果的消费,此时返回的CompletableFuture是空返回。只消费,无返回
CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

异常处理

//exceptionally方法会给我们一个异常作为参数,同时返回一个默认值,默认值的类型和上一个操作的返回值相同。
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

组合异步任务

thenApply

// 返回结果是 future 1 future 2
 CompletableFuture.supplyAsync(() -> "future 1").thenApply(result -> result + " future 2");

thenCompose

//打印结果 future 1 future 2  与thenApply相似,但是需要手动返回CompletableFuture
 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "future 1").thenCompose(result -> {
            String s = result + " future 2";
            System.out.println(s);
            return new CompletableFuture<String>();
        });

thenCombine

//打印结果 future 1 future 2 
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 1");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " future 2");
        CompletableFuture<String> completableFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
        System.out.println(completableFuture.get());

完成结果处理

allOf

//  打印结果 future 1 future 2 
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 1");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "future 2");
        CompletableFuture.allOf(future1, future2);
        System.out.println(future1.get() + " " + future2.get());

anyOf