在 Java8 中,有 CompletableFuture 类,帮助简化了异步编程的复杂性。
一、创建异步对象Completable 提供了四个静态方法创建异步 *** 作。
import java.util.concurrent.*;
public class CompletableFutureTest {
// 创建一个线程池
public static ExecutorService executor =Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 无返回结果 , 且使用默认线程池
// result: ForkJoinPool.commonPool-worker-1 === > runAsync(runnable)
CompletableFuture.runAsync(
()-> System.out.println(Thread.currentThread().getName()+" === > runAsync(runnable)"));
// 无返回结果 , 且使用自定义线程池
// result: pool-1-thread-1 === > runAsync(runnable,executor)
CompletableFuture.runAsync(
()-> System.out.println(Thread.currentThread().getName()+" === > runAsync(runnable,executor)"), executor);
// 有返回结果 , 且使用默认线程池
// result: main get result : ForkJoinPool.commonPool-worker-1 === > supplyAsync(supplier)
CompletableFuture supply1 = CompletableFuture.supplyAsync(
() -> Thread.currentThread().getName() + " === > supplyAsync(supplier)");
System.out.println(Thread.currentThread().getName()+" get result : "+supply1.get());
// 有返回结果 , 且使用自定义线程池
// result: main get result : pool-1-thread-2 === > supplyAsync(supplier,executor)
CompletableFuture supply2 = CompletableFuture.supplyAsync(
() -> Thread.currentThread().getName() + " === > supplyAsync(supplier,executor)",executor);
System.out.println(Thread.currentThread().getName()+" get result : "+supply2.get());
}
}
二、 异步对象完成后的回调
在上述异步对象完成后,可以回调对应方法
方法中有后缀 Async 的表示异步执行(利用新的线程去处理),否则继续利用同一个线程处理。
方法中有带 Executor 参数 的,表示可以传入自定义的线程池,利用其中的线程进行处理。
whenComplete* 方法可以获取到上一个 Future 的 结果 或 异常 。
(1) whenComplete* 方法 没有返回值 ,但可以改变 上一个 Future 的 结果
(2) 如果上一个 Future 出现异常 , 则 whenComplete* 方法后进行 get() 也会出现异常。
exceptionally 方法则可以获取 上一个 Future 或者 whenComplete* 方法 中的异常,并返回自定义结果。
public class CompletableFutureTest2 {
// 创建一个线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
// pool-1-thread-1 == > running
CompletableFuture fu = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running ");
MyClass out=new MyClass();
out.val=1;
// 出现异常,跳转到 .exceptionally(fn) 方法
int i=10/0;
return out;
}, executor).whenCompleteAsync((res, exception) -> {
// 改变 res 中的值
res.val=2;
// 出现异常,跳转到 .exceptionally(fn) 方法
int i=10/0;
System.out.println(Thread.currentThread().getName() + " == > running fu.whenCompleteAsync1");
System.out.println(Thread.currentThread().getName() + " get ==> result: " + res + "\t exception: " + exception);
}, executor).exceptionally((exception) -> {
System.out.println(Thread.currentThread().getName() + " get ==> exception: " + exception);
return new MyClass(-1);
});
System.out.println(Thread.currentThread().getName() + " get ==> result: " +fu.get().val);
}
}
class MyClass{
int val;
MyClass(){}
MyClass(int val){this.val=val;}
}
三、 handle 方法
和上述的 whenComplete* 方法 类似,但 handle 即可处理异常,还可以改变返回结果的类型。
public class CompletableFutureTest3 {
// 创建一个线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
// pool-1-thread-1 == > running
CompletableFuture handle = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running ");
int out=100;
// 出现异常
int i = 10 / 0;
return out;
}, executor).handle((result, exception) -> {
System.out.println("===== handle method =======");
System.out.println(result);
System.out.println(exception);
// 改变返回结果类型
return "exception";
});
System.out.println(Thread.currentThread().getName()+" get result from handle : " +handle.get());
}
}
四、 线程串行化方法
跟whenComplete* 方法 类似,即在上一个任务完成后,进入下一个任务
thenApply* 方法:获取上一个任务的返回值,并返回自己的返回值类型
thenAccept* 方法:消费 上一个任务的返回值
thenRun* 方法:不获取上一个任务的返回值,也不返回任何东西
public class CompletableFutureTest4 {
// 创建一个线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture base = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running ");
int out = 10 / 2;
return out;
}, executor);
CompletableFuture applyFuture = base.thenApplyAsync(res ->
"thenApplyAsync get the result from base :" + res
);
CompletableFuture acceptFuture = base.thenAccept(res -> {
System.out.println("consume the result form base ");
System.out.println("and return nothing ");
});
CompletableFuture runFuture = base.thenRun(() -> {
System.out.println("consume nothing form base ");
System.out.println("and return nothing ");
});
}
}
五、 两任务组合 -- 都完成
组合两个任务,待两个任务都完成时,触发对应的任务
thenCombine*:获取两个任务的结果,并返回自己的结果
thenAcceptBoth* 方法:消费两个任务的结果
thenRunBoth* 方法:不获取上一个任务的结果,也不返回任何东西
public class CompletableFutureTest5 {
// 创建一个线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture fu1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running fu1");
return 1;
}, executor);
CompletableFuture fu2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running fu2");
return "a";
}, executor);
CompletableFuture combine = fu1.thenCombine(fu2, (result1, result2) -> {
System.out.println("======== thenCombine method ========");
System.out.println(result1);
System.out.println(result2);
return new int[]{1};
});
CompletableFuture accept = fu1.thenAcceptBoth(fu2, (result1, result2) -> {
System.out.println("======== thenAcceptBoth method ========");
System.out.println(result1);
System.out.println(result2);
});
CompletableFuture run = fu1.runAfterBoth(fu2, () -> {
System.out.println("======== runAfterBoth method ========");
});
}
}
六、 两任务组合 -- 一完成
组合两个任务,只要有一个任务完成,触发对应的任务
不过两个任务的 返回值类型 必须 相同,与 五 有区别 ,五的两个任务的返回值类型可不同
public class CompletableFutureTest6 {
// 创建一个线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture fu1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running fu1");
return 1;
}, executor);
CompletableFuture fu2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running fu2");
return 2;
}, executor);
CompletableFuture combine = fu1.applyToEither(fu2, (result) -> {
System.out.println("======== thenCombine method ========");
System.out.println(result);
return new int[]{1};
});
CompletableFuture accept = fu1.acceptEither(fu2, (result) -> {
System.out.println("======== thenAcceptBoth method ========");
System.out.println(result);
});
CompletableFuture run = fu1.runAfterEither(fu2, () -> {
System.out.println("======== runAfterBoth method ========");
});
}
}
七、 多任务组合
多个任务进行编排
allof:等待多个任务都完成
anyof:任意一个任务完成
public class CompletableFutureTest7 {
// 创建一个线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture fu1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running fu1");
return 1;
}, executor);
CompletableFuture fu2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " == > running fu2");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "2";
}, executor);
CompletableFuture all = CompletableFuture.allOf(fu1, fu2);
all.get();
System.out.println(fu1.get());
System.out.println(fu2.get());
CompletableFuture
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)