很多小伙伴经常会遇到这样的一个场景,在系统中有很多对其他模块的远程调用,通常一个操作需要调用多个外部接口才能完成。起初系统采用了同步顺序调用多个外部接口的方式。这样总的耗时为 n1+n2+n3+... 。后续我们考虑是否可以采用异步+回调的方式,让多个请求同时访问。那么到底怎么搞才是比较秀的呢。
技术选型
一、伪异步 -- Future
创建线程的方式只有两种:继承Thread或者实现Runnable接口。 但是这两种方法都存在一个缺陷,没有返回值,也就是说我们无法得知线程执行结果。虽然简单场景下已经满足,但是当我们需要返回值的时候怎么办呢? Java 1.5 以后的Callable和Future接口就解决了这个问题,我们可以通过向线程池提交一个Callable来获取一个包含返回值的Future对象
当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值。但是Future的get()方法会阻塞主线程。
这种场景下返回的 Future 的不足之处:
•只有主动调用 get 方法去获取值,但是有可能值还没准备好,就阻塞等待。
•任务处理过程中出现异常会把异常隐藏,封装到 Future 里面去,只有调用 get 方法的时候才知道异常了。
二、进阶版 -- Guava异步
Guava的ListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。
其实是回调(callback):在异步任务提交之后,注册一个回调函数。
1、addListener
Guava包里面对 JDK 的 Future 进行了扩展:新增了 addListener 的方法。
创建线程池的方法也有所改变,用Guava的 MoreExecutors 装饰了一下。
调用 ListeningExecutorService 的submit 方法会返回 ListenableFuture,使用 addListener 就可以注册回调方法。
获取运行结果是另外的线程执行的,完全没有阻塞主线程
2、FutureCallback
有onSuccess 和 onFailure 两个回调方法,使用方法如下:
Guava 的 ListenableFuture 已经可以处理大部分的异步任务场景了,但是不够强大,并不能很好地完成以下我们所需的业务
•将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
•等待Future集合种的所有任务都完成。
•仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。
三、终极版 -- CompletableFuture
在Java 8 中, 新增加了一个类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture 实现了两个接口,一个是我们熟悉的 Future ,一个是 CompletionStage。CompletionStage可以用于构建复杂的异步计算流水线,通过串行、并行、组合等方式来处理异步计算的结果。
CompletableFuture使用
方法命名规则
•带有Async后缀方法都是异步另外线程执行,没有就是复用之前任务的线程
•带有Apply标识方法都是可以获取返回值+有返回值的
•带有Accept标识方法都是可以获取返回值
•带有run标识的方法不可以获取返回值和无返回值,只是运行
get方法和join方法
•join:阻塞获取结果或抛出非受检异常。
•get: 阻塞获取结果或抛出受检测异常,需要显示进行try...catch处理
实例化
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
//或者可以通过一个简单的无参构造器
CompletableFuture<Object> completableFuture = new CompletableFuture<Object>();
注意:在实例化方法中,我们是可以指定Executor参数的,当我们不指定的试话,我们所开的并行线程使用的是默认系统及公共线程池ForkJoinPool,而且这些线程都是守护线程。我们在编程的时候需要谨慎使用守护线程,如果将我们普通的用户线程设置成守护线程,当我们的程序主线程结束,JVM中不存在其余用户线程,那么CompletableFuture的守护线程会直接退出,造成任务无法完成的问题。
CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
//handle方法集和上面的complete方法集,区别就在于返回值,handle方法是可以自定义的
<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
//apply方法集和handle方法集,唯一的不同是,handle方法会给出异常,可以让用户自己在内部处理,而apply方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
//accept 方法集只做最终结果的消费,此时返回的CompletableFuture是空返回。只消费,无返回
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
异常处理
//exceptionally方法会给我们一个异常作为参数,同时返回一个默认值,默认值的类型和上一个操作的返回值相同。
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
组合异步任务
thenApply
// 返回结果是 future 1 future 2
CompletableFuture.supplyAsync(() -> "future 1").thenApply(result -> result + " future 2");
thenCompose
//打印结果 future 1 future 2 与thenApply相似,但是需要手动返回CompletableFuture
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "future 1").thenCompose(result -> {
String s = result + " future 2";
System.out.println(s);
return new CompletableFuture<String>();
});
thenCombine
//打印结果 future 1 future 2
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " future 2");
CompletableFuture<String> completableFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
System.out.println(completableFuture.get());
完成结果处理
allOf
// 打印结果 future 1 future 2
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "future 2");
CompletableFuture.allOf(future1, future2);
System.out.println(future1.get() + " " + future2.get());