Java 之 异步 CompletableFuture

Java 之 异步 CompletableFuture,第1张

        在 Java8 中,有 CompletableFuture 类,帮助简化了异步编程的复杂性。

一、创建异步对象

        Completable 提供了四个静态方法创建异步 *** 作。

import java.util.concurrent.*;

public class CompletableFutureTest {
    // 创建一个线程池
    public static ExecutorService executor =Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 无返回结果 , 且使用默认线程池 
        // result: ForkJoinPool.commonPool-worker-1 === > runAsync(runnable)
        CompletableFuture.runAsync(
                ()-> System.out.println(Thread.currentThread().getName()+" === > runAsync(runnable)"));
        
        // 无返回结果 , 且使用自定义线程池 
        // result: pool-1-thread-1 === > runAsync(runnable,executor)
        CompletableFuture.runAsync(
                ()-> System.out.println(Thread.currentThread().getName()+" === > runAsync(runnable,executor)"), executor);


        // 有返回结果 , 且使用默认线程池 
        // result: main get result : ForkJoinPool.commonPool-worker-1 === > supplyAsync(supplier)
        CompletableFuture supply1 = CompletableFuture.supplyAsync(
                () -> Thread.currentThread().getName() + " === > supplyAsync(supplier)");
        System.out.println(Thread.currentThread().getName()+" get result : "+supply1.get());


        // 有返回结果 , 且使用自定义线程池 
        // result: main get result : pool-1-thread-2 === > supplyAsync(supplier,executor)
        CompletableFuture supply2 = CompletableFuture.supplyAsync(
                () -> Thread.currentThread().getName() + " === > supplyAsync(supplier,executor)",executor);
        System.out.println(Thread.currentThread().getName()+" get result : "+supply2.get());
    }
}
 二、 异步对象完成后的回调

        在上述异步对象完成后,可以回调对应方法

        方法中有后缀 Async 的表示异步执行(利用新的线程去处理),否则继续利用同一个线程处理。

        方法中有带 Executor 参数 的,表示可以传入自定义的线程池,利用其中的线程进行处理。

         whenComplete* 方法可以获取到上一个 Future 的 结果 或 异常 。

        (1) whenComplete* 方法 没有返回值 ,但可以改变 上一个 Future 的 结果

        (2) 如果上一个 Future 出现异常 , 则 whenComplete* 方法后进行 get() 也会出现异常。

        exceptionally 方法则可以获取 上一个 Future 或者 whenComplete* 方法 中的异常,并返回自定义结果。

public class CompletableFutureTest2 {
    // 创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
        // pool-1-thread-1 == > running
        CompletableFuture fu = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running ");
            MyClass out=new MyClass();
            out.val=1;
            // 出现异常,跳转到 .exceptionally(fn) 方法
            int i=10/0;
            return out;
        }, executor).whenCompleteAsync((res, exception) -> {
            // 改变 res 中的值
            res.val=2;

            // 出现异常,跳转到 .exceptionally(fn) 方法
            int i=10/0;
            System.out.println(Thread.currentThread().getName() + " == > running fu.whenCompleteAsync1");
            System.out.println(Thread.currentThread().getName() + " get ==> result: " + res + "\t exception: " + exception);

        }, executor).exceptionally((exception) -> {
            System.out.println(Thread.currentThread().getName() + " get ==> exception: " + exception);
            return new MyClass(-1);
        });
        System.out.println(Thread.currentThread().getName() + " get ==> result: " +fu.get().val);
    }
}
class MyClass{
    int val;
    MyClass(){}
    MyClass(int val){this.val=val;}
}
三、 handle 方法

        和上述的 whenComplete* 方法 类似,但 handle 即可处理异常,还可以改变返回结果的类型。

public class CompletableFutureTest3 {
    // 创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // pool-1-thread-1 == > running
        CompletableFuture handle = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running ");
            int out=100;
            // 出现异常
            int i = 10 / 0;
            return out;
        }, executor).handle((result, exception) -> {
            System.out.println("===== handle method =======");
            System.out.println(result);
            System.out.println(exception);
            // 改变返回结果类型
            return "exception";
        });
        System.out.println(Thread.currentThread().getName()+" get result from handle : " +handle.get());
    }
}
四、 线程串行化方法

         跟whenComplete* 方法 类似,即在上一个任务完成后,进入下一个任务

        thenApply* 方法:获取上一个任务的返回值,并返回自己的返回值类型

        thenAccept* 方法:消费 上一个任务的返回值

        thenRun* 方法:不获取上一个任务的返回值,也不返回任何东西

public class CompletableFutureTest4 {
    // 创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture base = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running ");
            int out = 10 / 2;
            return out;
        }, executor);

        
        CompletableFuture applyFuture = base.thenApplyAsync(res ->
                "thenApplyAsync get the result from base :" + res
        );

        
        CompletableFuture acceptFuture = base.thenAccept(res -> {
            System.out.println("consume the result form base ");
            System.out.println("and return nothing ");
        });

        
        CompletableFuture runFuture = base.thenRun(() -> {
            System.out.println("consume nothing form base ");
            System.out.println("and return nothing ");
        });
    }
}
五、 两任务组合 -- 都完成

        组合两个任务,待两个任务都完成时,触发对应的任务 

        thenCombine*:获取两个任务的结果,并返回自己的结果

        thenAcceptBoth* 方法:消费两个任务的结果

        thenRunBoth* 方法:不获取上一个任务的结果,也不返回任何东西

public class CompletableFutureTest5 {
    // 创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture fu1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running fu1");
            return 1;
        }, executor);

        CompletableFuture fu2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running fu2");
            return "a";
        }, executor);

        CompletableFuture combine = fu1.thenCombine(fu2, (result1, result2) -> {
            System.out.println("======== thenCombine method ========");
            System.out.println(result1);
            System.out.println(result2);
            return new int[]{1};
        });

        CompletableFuture accept = fu1.thenAcceptBoth(fu2, (result1, result2) -> {
            System.out.println("======== thenAcceptBoth method ========");
            System.out.println(result1);
            System.out.println(result2);
        });

        CompletableFuture run = fu1.runAfterBoth(fu2, () -> {
            System.out.println("======== runAfterBoth method ========");
        });
    }
}
六、 两任务组合 -- 一完成

        组合两个任务,只要有一个任务完成,触发对应的任务 

        不过两个任务的 返回值类型 必须 相同,与 五 有区别 ,五的两个任务的返回值类型可不同

public class CompletableFutureTest6 {
    // 创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture fu1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running fu1");
            return 1;
        }, executor);

        CompletableFuture fu2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running fu2");
            return 2;
        }, executor);

        CompletableFuture combine = fu1.applyToEither(fu2, (result) -> {
            System.out.println("======== thenCombine method ========");
            System.out.println(result);
            return new int[]{1};
        });

        CompletableFuture accept = fu1.acceptEither(fu2, (result) -> {
            System.out.println("======== thenAcceptBoth method ========");
            System.out.println(result);
        });

        CompletableFuture run = fu1.runAfterEither(fu2, () -> {
            System.out.println("======== runAfterBoth method ========");
        });
    }
}
七、 多任务组合

        多个任务进行编排 

         allof:等待多个任务都完成

        anyof:任意一个任务完成

public class CompletableFutureTest7 {
    // 创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture fu1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running fu1");
            return 1;
        }, executor);

        CompletableFuture fu2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " == > running fu2");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "2";
        }, executor);

        CompletableFuture all = CompletableFuture.allOf(fu1, fu2);
        all.get();
        System.out.println(fu1.get());
        System.out.println(fu2.get());

        CompletableFuture any = CompletableFuture.anyOf(fu1, fu2);
        System.out.println(any.get());

    }
} 

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/793824.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)