JDK8CompletableFuture创建

JDK8CompletableFuture创建,第1张

JDK8CompletableFuture创建

一、CompletableFuture替代Future+Callable完成更复杂的并发场景,使用completableFuture的使用就可以用then,when等等 *** 作来防止Future以上的get阻塞和轮询isDone的现象出现,既可以表示一个完整的CompletableFuture,也可以表示CompletionStage阶段组合更复杂的阶段。

二、创建CompletableFuture的方式.

       1、直接查看源码.

        2、无返回+默认线程池ForkJoinPool

    // 测试CompletableFuture的异步默认线程池方式
    private static void testCompletableFuture(){
        CompletableFuture completableFuture = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println("Thread:" + Thread.currentThread().getName());
            }
        });
        // 阻塞主线程等待线程完成
        completableFuture.join();
        System.out.println("runAsync");
    }

     3、无返回值+自定义线程池

    // 测试CompletableFuture的异步默认线程池方式
    private static void testCompletableFuture(){
        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        CompletableFuture completableFuture = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println("Thread:" + Thread.currentThread().getName());
            }
        },threadPoolExecutor);
        // 阻塞主线程等待线程完成
        completableFuture.join();
        threadPoolExecutor.shutdown();
        System.out.println("runAsync");
    }

      4、有返回值+默认线程池.

    // 测试CompletableFuture的异步默认线程池方式
    private static void testCompletableFuture() throws ExecutionException, InterruptedException {
        //ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println("Thread:" + Thread.currentThread().getName());
            return "CompletableFuture";
        });
        // 阻塞主线程等待线程完成
        completableFuture.join();
        System.out.println(completableFuture.get());
        //threadPoolExecutor.shutdown();
        System.out.println("runAsync");
    }

      5、有返回值+自定义线程池.

    // 测试CompletableFuture的异步默认线程池方式
    private static void testCompletableFuture() throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println("Thread:" + Thread.currentThread().getName());
            return "CompletableFuture+threadPoolExecutor";
        },threadPoolExecutor);
        // 阻塞主线程等待线程完成
        completableFuture.join();
        System.out.println(completableFuture.get());
        threadPoolExecutor.shutdown();
        System.out.println("supplyAsync");
    }

 三、多接口并行优化实践。

1、Future+Callable+Executors

   三个模拟的业务方法.

    private static String getUserDomain() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "getUserDomain";
    }

    private static String getBillTime() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "getBillTime";
    }

    private static String getFlowData() {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "getFlowData";
    }
    private static void getCallableResult() throws InterruptedException, ExecutionException {
        // Callable+Executors+Future线程池方式
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        long start = System.currentTimeMillis();
        List> futureList = new ArrayList<>();
        Future result1 = executorService.submit(new Callable() {
            @Override
            public String call() throws Exception {
                return getUserDomain();
            }
        });
        Future result2 = executorService.submit(new Callable() {
            @Override
            public String call() throws Exception {
                return getFlowData();
            }
        });
        Future result3 = executorService.submit(new Callable() {
            @Override
            public String call() throws Exception {
                return getBillTime();
            }
        });
        futureList.add(result1);
        futureList.add(result2);
        futureList.add(result3);
        for (Future future : futureList) {
            System.out.println("Result:" + future.get());
        }
        executorService.shutdown();
        long end = System.currentTimeMillis();
        System.out.println("共计耗时:" + (end - start)+"秒");
    }

 最终是响应最久的接口完成才完成最后计算结果。

2、CompletableFuture+Executor.

    private static void getCompletableResult() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        long start = System.currentTimeMillis();
        CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() ->
                        getUserDomain()
                , executorService);
        CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() ->
                        getBillTime()
                , executorService);
        CompletableFuture completableFuture3 = CompletableFuture.supplyAsync(() ->
                        getFlowData()
                , executorService);
        CompletableFuture.allOf(completableFuture1,completableFuture2,completableFuture3).join();
        List completableList=new ArrayList<>();
        completableList.add(completableFuture1.get());
        completableList.add(completableFuture2.get());
        completableList.add(completableFuture3.get());
        System.out.println(completableList);
        executorService.shutdown();
        long end=System.currentTimeMillis();
        System.out.println("Completable cost time: "+(end-start)+"S");
    }

 API如下:

1、Supplier函数式接口,俺没有入参但有返回值,凭实力有的返回值。

2、Runnable,俺没有入参也没有返回值你说气人不?就是这么随心所欲。

3、CompletableFuture.allOf

 allOf就是所有任务都完成时返回。但是是个Void的返回值。

4、anyOf是当入参的completableFuture组中有一个任务执行完毕就返回。返回结果是第一个完成的任务的结果。

5、join();挂起调用线程的执行,主线程调用了那个异步的任务,然后主线程挂起等待子线程完成在执行后面的喽

然后顺便看看Thread的join()方法.

 

 至此了解CompletableFuture的基本创建,下一篇介绍then()和when()等的阶段计算,多看哈源码就大概知道了,而且影响深刻,奥利给。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5573192.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存