CompletableFuture字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。
下面的示例,比较简单的说明了,CompletableFuture是如何被主动完成的。在下面这段代码中,由于调用了complete方法,所以最终的打印结果是“manual test”,而不是"test"。
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
try{
Thread.sleep(1000L);
return "test";
} catch (Exception e){
return "failed test";
}
});
future.complete("manual test");
System.out.println(future.join());
CompletableFuture实现了CompletableStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
先看一下CompletableFuture的方法,其中我们常用的是supplyAsync和runAsync两个。
supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable task) 方法,runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法,这两方法的效果跟submit是一样的。这两方法各有一个重载版本,可以指定执行异步任务的Executor(线程池)实现,如果不指定,默认使用ForkJoinPool.commonPool(),这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O *** 作,就会导致线程池中所有线程都阻塞在 I/O *** 作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰
public static void main(String[] args) throws Exception{
//创建异步执行任务:
CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(false){
throw new RuntimeException("抛出异常咯");
}
System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
return "子线程结束";
});
System.out.println("child thread result->"+childThread.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
运行结果如下
public static void main(String[] args) throws Exception{
//创建异步执行任务:
CompletableFuture childThread = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(false){
throw new RuntimeException("抛出异常咯");
}
System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
});
System.out.println("child thread result->"+childThread.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
}
代码执行结果
public static void main(String[] args) throws Exception{
ExecutorService executorService= Executors.newSingleThreadExecutor();
CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(false){
throw new RuntimeException("抛出异常咯");
}
System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
return "异步线程执行完毕";
},executorService);
System.out.println("child thread result->"+childThread.get());
System.out.println("main thread exit,time->"+System.currentTimeMillis());
executorService.shutdown();
}
代码执行结果
join()和get()方法都是用来获取CompletableFuture的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
非阻塞式获取异步线程结果不论Future.get()方法还是CompletableFuture.get()方法都是阻塞的,为了获取任务的结果同时不阻塞当前线程的执行,我们可以使用CompletionStage提供的方法结合callback来实现任务的异步处理。
同Future相比,CompletableFuture最大的不同是支持流式(Stream)的计算处理,多个任务之间,可以前后相连,从而形成一个计算流。比如:任务1产生的结果,可以直接作为任务2的入参,参与任务2的计算,以此类推。
下面是一些处理流式任务的方法:
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
接收上一阶段的输出作为本阶段的输入,并消费处理,无返回结果
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
不关心前一阶段的计算结果,因为它不需要输入参数,进行消费处理,无返回结果。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
与thenApply相似,都在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
执行当前任务的线程执行继续执行 whenComplete 的任务。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
把 whenCompleteAsync 这个任务继续提交给默认线程池来进行执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
把 whenCompleteAsync 这个任务继续提交给指定线程池来进行执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
在写代码时,我们常用try…catch…finally这样的代码块处理异常。而handle就像finally,不论正常返回还是出异常都会进入handle,类似whenComplete。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
方法不以Async结尾,意味着使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。(暂未验证)除此之外,两者没有其他区别。因此,为了快速理解,在接下来的介绍中,我们主要介绍不带Async的版本。
thenApplythenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的CompletableFuture实例对象。
public static void main(String[] args) throws Exception {
CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(false){
throw new RuntimeException("抛出异常咯");
}
System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
return "异步线程执行完毕";
});
CompletableFuture<String> childThread1 = childThread.thenApply(result ->{
System.out.println(Thread.currentThread().getId() + "||||||" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getId() + "||||||" + result);
return "thenApply";
});
System.out.println("main thread exit,time->"+System.currentTimeMillis());
System.out.println(childThread.get());
System.out.println(childThread1.get());
}
程序运行结果如下
将if的判断条件改成true后,前一个任务会抛出异常,此时thenApply将不会再执行。
在网上看到很多人都像下面这么说
但是我试过尝试在thenApply中return 一个不是string类型的值,结果是报错的,所以对于这句转换泛型类型我抱有疑问。
后来我点进thenApply的源码查看,发现它也是直接new的对象返回的,并且泛型类型和传入的泛型类型一致。所以我认为这应该是错误的说法
和thenApply用法类似,不同之处在于thenAccept没有返回值
public static void main(String[] args) throws Exception {
CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(false){
throw new RuntimeException("抛出异常咯");
}
System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
return "异步线程执行完毕";
});
CompletableFuture<Void> childThread1 = childThread.thenAccept(result ->{
System.out.println(Thread.currentThread().getId() + "||||||" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getId() + "||||||" + result);
});
System.out.println("childThread:" + childThread.get());
System.out.println("childThread1:" + childThread1.get());
}
程序运行结果
同样,将if中的判断条件改为true。当前置条件抛出异常,thenAccept将不再执行
相比于thenAccept,thenRun同样没有返回值,且thenRun还不需要前置任务的返回结果做入参
public static void main(String[] args) throws Exception {
System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(false){
throw new RuntimeException("抛出异常咯");
}
System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
return "异步线程执行完毕";
});
CompletableFuture<Void> childThread1 = childThread.thenRun(() ->{
System.out.println(Thread.currentThread().getId() + "||||||" + System.currentTimeMillis());
});
System.out.println("childThread:" + childThread.get());
System.out.println("childThread1:" + childThread1.get());
}
程序结果
等待前面两个CompletableFuture任务执行完毕后,进行后续 *** 作
public static void main(String[] args) throws Exception {
System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
CompletableFuture<String> childThread1 = CompletableFuture.supplyAsync(()->{
System.out.println("任务1===" + Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
System.out.println("任务1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1===" + Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
return "任务1";
});
CompletableFuture<String> childThread2 = CompletableFuture.supplyAsync(()->{
System.out.println("任务2===" + Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
System.out.println("任务2");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2===" + Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
return "任务2";
});
CompletableFuture<Integer> childThread3 = childThread1.thenCombine(childThread2,new BiFunction<String, String, Integer>() {
@Override
public Integer apply(String x, String y) {
System.out.println("任务thenCombine===" + Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
System.out.println(x+y);
Integer i = new Integer(1);
return i;
}
});
System.out.println("childThread3:" + childThread3.get());
}
程序运行结果
关于异常,两个前置任务任何一个出现异常,则thenCombine都不会运行
作用类似thenApply,但是thenCompose的返回类型可以和前置任务不同
public static void main(String[] args) throws Exception {
CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
return "任务1";
});
CompletableFuture<Integer> childThread1 = childThread.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
System.out.println(result);
return 666;
}));
System.out.println("1----" + childThread.get());
System.out.println("2----" + childThread1.get());
}
程序运行结果
当CompletableFuture的任务不论是正常完成还是出现异常它都会调用whenComplete函数
public static void main(String[] args) throws Exception {
CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(false){
throw new RuntimeException("抛出异常咯");
}
System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
return "任务1";
});
CompletableFuture<String> childThread1 = childThread.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println("whenComplete");
if (throwable == null) {
System.out.println("whenComplete throwable is null");
} else {
System.out.println("whenComplete throwable is " + throwable.getMessage());
}
}
});
System.out.println("1----" + childThread.get());
System.out.println("2----" + childThread1.get());
}
程序运行结果
当我们将前置任务重的if条件改为true时,程序运行结果如下
可以看出,尽管前置任务出现异常,但是任然会运行whenComplete中的任务
与whenComplete类似,无论前置任务是否出现异常都会执行的方法。个人感觉类似于try/catch
与whenComplete不同之处在于,handle可以返回自定义结果。且会类似try/catch一样将异常catch在handle中,使程序继续正常运行下去
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> childThread = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(true){
throw new RuntimeException("抛出异常咯");
}
System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
return "任务1";
}).handle(new BiFunction<String, Throwable, Integer>() {
@Override
public Integer apply(String dept, Throwable throwable) {
if (throwable != null){
System.out.println("handle" + throwable.getMessage());
return 1;
} else {
return 2;
}
}
});
System.out.println("1----" + childThread.get());
}
程序输出
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)