一、CompletableFuture替代Future+Callable完成更复杂的并发场景,使用completableFuture的使用就可以用then,when等等 *** 作来防止Future以上的get阻塞和轮询isDone的现象出现,既可以表示一个完整的CompletableFuture,也可以表示CompletionStage阶段组合更复杂的阶段。
二、创建CompletableFuture的方式.
1、直接查看源码.
2、无返回+默认线程池ForkJoinPool
// 测试CompletableFuture的异步默认线程池方式 private static void testCompletableFuture(){ CompletableFuturecompletableFuture = CompletableFuture.runAsync(new Runnable() { @Override public void run() { System.out.println("Thread:" + Thread.currentThread().getName()); } }); // 阻塞主线程等待线程完成 completableFuture.join(); System.out.println("runAsync"); }
3、无返回值+自定义线程池
// 测试CompletableFuture的异步默认线程池方式 private static void testCompletableFuture(){ ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); CompletableFuturecompletableFuture = CompletableFuture.runAsync(new Runnable() { @Override public void run() { System.out.println("Thread:" + Thread.currentThread().getName()); } },threadPoolExecutor); // 阻塞主线程等待线程完成 completableFuture.join(); threadPoolExecutor.shutdown(); System.out.println("runAsync"); }
4、有返回值+默认线程池.
// 测试CompletableFuture的异步默认线程池方式 private static void testCompletableFuture() throws ExecutionException, InterruptedException { //ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println("Thread:" + Thread.currentThread().getName()); return "CompletableFuture"; }); // 阻塞主线程等待线程完成 completableFuture.join(); System.out.println(completableFuture.get()); //threadPoolExecutor.shutdown(); System.out.println("runAsync"); }
5、有返回值+自定义线程池.
// 测试CompletableFuture的异步默认线程池方式 private static void testCompletableFuture() throws ExecutionException, InterruptedException { ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println("Thread:" + Thread.currentThread().getName()); return "CompletableFuture+threadPoolExecutor"; },threadPoolExecutor); // 阻塞主线程等待线程完成 completableFuture.join(); System.out.println(completableFuture.get()); threadPoolExecutor.shutdown(); System.out.println("supplyAsync"); }
三、多接口并行优化实践。
1、Future+Callable+Executors
三个模拟的业务方法.
private static String getUserDomain() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "getUserDomain"; } private static String getBillTime() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "getBillTime"; } private static String getFlowData() { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return "getFlowData"; }
private static void getCallableResult() throws InterruptedException, ExecutionException { // Callable+Executors+Future线程池方式 ExecutorService executorService = Executors.newFixedThreadPool(3); long start = System.currentTimeMillis(); List> futureList = new ArrayList<>(); Future result1 = executorService.submit(new Callable () { @Override public String call() throws Exception { return getUserDomain(); } }); Future result2 = executorService.submit(new Callable () { @Override public String call() throws Exception { return getFlowData(); } }); Future result3 = executorService.submit(new Callable () { @Override public String call() throws Exception { return getBillTime(); } }); futureList.add(result1); futureList.add(result2); futureList.add(result3); for (Future future : futureList) { System.out.println("Result:" + future.get()); } executorService.shutdown(); long end = System.currentTimeMillis(); System.out.println("共计耗时:" + (end - start)+"秒"); }
最终是响应最久的接口完成才完成最后计算结果。
2、CompletableFuture+Executor.
private static void getCompletableResult() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); long start = System.currentTimeMillis(); CompletableFuturecompletableFuture1 = CompletableFuture.supplyAsync(() -> getUserDomain() , executorService); CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> getBillTime() , executorService); CompletableFuture completableFuture3 = CompletableFuture.supplyAsync(() -> getFlowData() , executorService); CompletableFuture.allOf(completableFuture1,completableFuture2,completableFuture3).join(); List completableList=new ArrayList<>(); completableList.add(completableFuture1.get()); completableList.add(completableFuture2.get()); completableList.add(completableFuture3.get()); System.out.println(completableList); executorService.shutdown(); long end=System.currentTimeMillis(); System.out.println("Completable cost time: "+(end-start)+"S"); }
API如下:
1、Supplier
2、Runnable,俺没有入参也没有返回值你说气人不?就是这么随心所欲。
3、CompletableFuture.allOf
allOf就是所有任务都完成时返回。但是是个Void的返回值。
4、anyOf是当入参的completableFuture组中有一个任务执行完毕就返回。返回结果是第一个完成的任务的结果。
5、join();挂起调用线程的执行,主线程调用了那个异步的任务,然后主线程挂起等待子线程完成在执行后面的喽
然后顺便看看Thread的join()方法.
至此了解CompletableFuture的基本创建,下一篇介绍then()和when()等的阶段计算,多看哈源码就大概知道了,而且影响深刻,奥利给。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)