CompletableFuture学习记录

CompletableFuture学习记录,第1张

CompletableFuture字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。
下面的示例,比较简单的说明了,CompletableFuture是如何被主动完成的。在下面这段代码中,由于调用了complete方法,所以最终的打印结果是“manual test”,而不是"test"。

CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
    try{
        Thread.sleep(1000L);
        return "test";
    } catch (Exception e){
        return "failed test";
    }
});
future.complete("manual test");
System.out.println(future.join());

CompletableFuture实现了CompletableStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
先看一下CompletableFuture的方法,其中我们常用的是supplyAsync和runAsync两个。

supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable task) 方法,runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法,这两方法的效果跟submit是一样的。这两方法各有一个重载版本,可以指定执行异步任务的Executor(线程池)实现,如果不指定,默认使用ForkJoinPool.commonPool(),这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O *** 作,就会导致线程池中所有线程都阻塞在 I/O *** 作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

基本使用 supplyAsync示例代码
public static void main(String[] args) throws Exception{
    //创建异步执行任务:
    CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(false){
            throw new RuntimeException("抛出异常咯");
        }
        System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
        return "子线程结束";
    });
    System.out.println("child thread result->"+childThread.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

运行结果如下

runAsync示例代码
public static void main(String[] args) throws Exception{
    //创建异步执行任务:
    CompletableFuture childThread = CompletableFuture.runAsync(()->{
        System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(false){
            throw new RuntimeException("抛出异常咯");
        }
        System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
    });
    System.out.println("child thread result->"+childThread.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

代码执行结果

指定线程池示例
public static void main(String[] args) throws Exception{
    ExecutorService executorService= Executors.newSingleThreadExecutor();
    CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(false){
            throw new RuntimeException("抛出异常咯");
        }
        System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
        return "异步线程执行完毕";
    },executorService);
    System.out.println("child thread result->"+childThread.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
    executorService.shutdown();
}

代码执行结果

获取结果get()、join()

join()和get()方法都是用来获取CompletableFuture的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)

非阻塞式获取异步线程结果

不论Future.get()方法还是CompletableFuture.get()方法都是阻塞的,为了获取任务的结果同时不阻塞当前线程的执行,我们可以使用CompletionStage提供的方法结合callback来实现任务的异步处理。
同Future相比,CompletableFuture最大的不同是支持流式(Stream)的计算处理,多个任务之间,可以前后相连,从而形成一个计算流。比如:任务1产生的结果,可以直接作为任务2的入参,参与任务2的计算,以此类推。
下面是一些处理流式任务的方法:

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);   
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
 
接收上一阶段的输出作为本阶段的输入,并消费处理,无返回结果
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);


不关心前一阶段的计算结果,因为它不需要输入参数,进行消费处理,无返回结果。  
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
 
把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
 
与thenApply相似,都在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
 
执行当前任务的线程执行继续执行 whenComplete 的任务。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
把 whenCompleteAsync 这个任务继续提交给默认线程池来进行执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
把 whenCompleteAsync 这个任务继续提交给指定线程池来进行执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
 
在写代码时,我们常用trycatchfinally这样的代码块处理异常。而handle就像finally,不论正常返回还是出异常都会进入handle,类似whenComplete。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

方法不以Async结尾,意味着使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。(暂未验证)除此之外,两者没有其他区别。因此,为了快速理解,在接下来的介绍中,我们主要介绍不带Async的版本。

thenApply

thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的CompletableFuture实例对象。

public static void main(String[] args) throws Exception {
    CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(false){
            throw new RuntimeException("抛出异常咯");
        }
        System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
        return "异步线程执行完毕";
    });
    CompletableFuture<String> childThread1 = childThread.thenApply(result ->{
        System.out.println(Thread.currentThread().getId() + "||||||" + System.currentTimeMillis());
        System.out.println(Thread.currentThread().getId() + "||||||" + result);
        return "thenApply";
    });
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
    System.out.println(childThread.get());
    System.out.println(childThread1.get());
}

程序运行结果如下

将if的判断条件改成true后,前一个任务会抛出异常,此时thenApply将不会再执行。
在网上看到很多人都像下面这么说

但是我试过尝试在thenApply中return 一个不是string类型的值,结果是报错的,所以对于这句转换泛型类型我抱有疑问。
后来我点进thenApply的源码查看,发现它也是直接new的对象返回的,并且泛型类型和传入的泛型类型一致。所以我认为这应该是错误的说法

thenAccept

和thenApply用法类似,不同之处在于thenAccept没有返回值

public static void main(String[] args) throws Exception {
    CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(false){
            throw new RuntimeException("抛出异常咯");
        }
        System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
        return "异步线程执行完毕";
    });
    CompletableFuture<Void> childThread1 = childThread.thenAccept(result ->{
        System.out.println(Thread.currentThread().getId() + "||||||" + System.currentTimeMillis());
        System.out.println(Thread.currentThread().getId() + "||||||" + result);
    });
    System.out.println("childThread:" + childThread.get());
    System.out.println("childThread1:" + childThread1.get());
}

程序运行结果

同样,将if中的判断条件改为true。当前置条件抛出异常,thenAccept将不再执行

thenRun

相比于thenAccept,thenRun同样没有返回值,且thenRun还不需要前置任务的返回结果做入参

public static void main(String[] args) throws Exception {
    System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
    CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(false){
            throw new RuntimeException("抛出异常咯");
        }
        System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
        return "异步线程执行完毕";
    });
    CompletableFuture<Void> childThread1 = childThread.thenRun(() ->{
        System.out.println(Thread.currentThread().getId() + "||||||" + System.currentTimeMillis());
    });
    System.out.println("childThread:" + childThread.get());
    System.out.println("childThread1:" + childThread1.get());
}

程序结果

thenCombine

等待前面两个CompletableFuture任务执行完毕后,进行后续 *** 作

public static void main(String[] args) throws Exception {
    System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
    CompletableFuture<String> childThread1 = CompletableFuture.supplyAsync(()->{
        System.out.println("任务1===" + Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
        System.out.println("任务1");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1===" + Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
        return "任务1";
    });
    CompletableFuture<String> childThread2 = CompletableFuture.supplyAsync(()->{
        System.out.println("任务2===" + Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
        System.out.println("任务2");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2===" + Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
        return "任务2";
    });
    CompletableFuture<Integer> childThread3 = childThread1.thenCombine(childThread2,new BiFunction<String, String, Integer>() {
                @Override
                public Integer apply(String x, String y) {
                    System.out.println("任务thenCombine===" + Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
                    System.out.println(x+y);
                    Integer i = new Integer(1);
                    return i;
                }
            });
    System.out.println("childThread3:" + childThread3.get());
}

程序运行结果

关于异常,两个前置任务任何一个出现异常,则thenCombine都不会运行

thenCompose

作用类似thenApply,但是thenCompose的返回类型可以和前置任务不同

public static void main(String[] args) throws Exception {
    CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
        return "任务1";
    });
    CompletableFuture<Integer> childThread1 = childThread.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
        System.out.println(result);
        return 666;
    }));
    System.out.println("1----" + childThread.get());
    System.out.println("2----" + childThread1.get());
}

程序运行结果

whenComplete

当CompletableFuture的任务不论是正常完成还是出现异常它都会调用whenComplete函数

public static void main(String[] args) throws Exception {
    CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(false){
            throw new RuntimeException("抛出异常咯");
        }
        System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
        return "任务1";
    });
    CompletableFuture<String> childThread1 = childThread.whenComplete(new BiConsumer<String, Throwable>() {
        @Override
        public void accept(String s, Throwable throwable) {
            System.out.println("whenComplete");
            if (throwable == null) {
                System.out.println("whenComplete throwable is null");
            } else {
                System.out.println("whenComplete throwable is " + throwable.getMessage());
            }
        }
    });
    System.out.println("1----" + childThread.get());
    System.out.println("2----" + childThread1.get());
}

程序运行结果

当我们将前置任务重的if条件改为true时,程序运行结果如下
可以看出,尽管前置任务出现异常,但是任然会运行whenComplete中的任务

handle

与whenComplete类似,无论前置任务是否出现异常都会执行的方法。个人感觉类似于try/catch
与whenComplete不同之处在于,handle可以返回自定义结果。且会类似try/catch一样将异常catch在handle中,使程序继续正常运行下去

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> childThread = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getId()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(true){
            throw new RuntimeException("抛出异常咯");
        }
        System.out.println(Thread.currentThread().getId()+" exit,time->"+System.currentTimeMillis());
        return "任务1";
    }).handle(new BiFunction<String, Throwable, Integer>() {
        @Override
        public Integer apply(String dept, Throwable throwable) {
            if (throwable != null){
                System.out.println("handle" + throwable.getMessage());
                return 1;
            } else {
                return 2;
            }
        }
    });
    System.out.println("1----" + childThread.get());
}

程序输出

几种异步任务方式的比较

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/730697.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)

保存