1、实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】
2、当前系统中线程池只有一两个,每个异步任务提交给线程池让他自己去执行
1)、继承Thread
2)、实现 Runnable接口
3)、实现 Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
FutureTask futureTask = new FutureTask(new MyCallable()); Thread thread = new Thread(futureTask); thread.start(); //阻塞等待这个线程执行完,获取返回结果 Integer o = (Integer) futureTask.get();
4)、线程池
区别;
1、2不能得到返回值。3可以获取返回值
1、2、3都不能控制资源
4可以控制资源,性能稳定,不会一下子所有线程一起运行
execute:参数只能是Runnable,没有返回值
submit:参数可以是Runnable、Callable,返回值是FutureTask
1、创建一个固定类型的线程池
Executors.newFixedThreadPool(10);
2、直接创建,7个参数
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:核心线程数,一直存在,一开始只是new 并没有start
maximumPoolSize:最大线程数量,控制资源
keepAliveTime:存活时间 【释放空闲线程(maximumPoolSize-corePoolSize) 超过存活时间释放线程】
unit:时间单位
workQueue: 阻塞队列,只要有线程空闲,就会去队列取出新的任务执行
threadFactory:线程的创建工厂【可以自定义】
RejectedExecutionHandler handler:拒绝策略
3、顺序:
1、先创建核心线程运行任务
2、核心线程满了,放入阻塞队列,空闲的core会自己去阻塞队列获取
new linkedBlockingDeque()默认是Integer的最大值,
3、阻塞队列满了继续创建线程,最多创建maximumPoolSize个
4、如果传入了拒绝策略会执行,否则抛出异常
5、拒绝策略:
1、丢弃最老的 Rejected
2、调用者同步调用,直接调用run方法,不创建线程了 Caller
3、直接丢弃新任务 Abort 【默认使用这个】
4、丢弃新任务,并且抛出异常 Discard
1、CachedThreadPool:核心线程数是0,如果空闲会回收所有线程【缓存线程池】
2、FixedThreadPool:核心线程数 = 最大线程数,【不回收】
3、ScheduledThreadPool:定时任务线程池,多久之后执行【可提交核心线程数,最大线程数是Integer.Max】
4、SingleThreadPool:核心与最大都只有一个【不回收】,后台从队列中获取任务
1、降低资源的消耗【减少创建销毁线程的开销】
通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
2、提高响应速度【控制线程个数】
因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
3、提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【短信等】,关闭非核心线程池释放内存资源】
线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
CompletableFuture提供了四个静态方法来创建一个异步 *** 作。 1)public static Completab1eFuture6.2、whencomplete接收任务返回值runAsync(Runnable runnable) 2)public static completableFuturecVoid> runAsync(Runnable runnable,Executor executor) 3)public static CompletableFuture supplyAsync(Suppliersupplier) 4)public static CompletableFuturecU> supplyAsync(Supplier supplier,Executor executor) 1、runXXX没有返回结果,supplyXxx有返回结果 2、可以传入自定义线程池,否则使用默认线程池 public class ThreadTest { public static ExecutorService excutor = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main start ...."); CompletableFuture future = CompletableFuture.supplyAsync(()->{ System.out.println("线程池执行任务1 : 1+1"); return 1 + 1; },service); System.out.println("main end.... result: " + future.get()); } }
public completableFuturewhencomplete(BiConsumer super T,? super Throwable> action); public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action); public completableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action,Executor executor); public completableFuture exceptionally(Function fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete和whenCompleteAsync的区别:
whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务。
whenCompleteAsync:是执行把 whenCompleteAsync这个任务继续提交给线程池来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
public static ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main start ...."); CompletableFuture6.3、handle方法【对返回值进行加工,再返回】future = CompletableFuture .supplyAsync(()->{ System.out.println("线程池执行任务1 : 10 / 0"); return 10 / 0; }, service) .whenComplete((result, exception)-> { // 得到异常信息,但是不能修改返回的数据 System.out.println("获取任务1的结果:" + result); System.out.println("获取任务1的异常:" + exception); }) .exceptionally((exception)-> { System.out.println("获取任务1的异常,并提供一个默认返回值" + exception); return 100; }); System.out.println("main end.... result: " + future.get()); }
public completionStage handle(BiFunction super T,Throwable,? extends U> fn); public completionStagehandleAsync(BiFunction super T,Throwable,? extends U> fn); public > CompletionStage handleAsync(BiFunction super T,Throwable,? extends U> fn,Executor executor ) ; 和complete一样,可对结果做最后的处理(可处理异常),可改变返回值。 public static ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main start ...."); // CompletableFuture6.4、线程串行化方法【then】future = CompletableFuture.supplyAsync(()->{ // System.out.println("线程池执行任务1 : 1+1"); // return 1 + 1; // },service); CompletableFuture future = CompletableFuture .supplyAsync(()->{ System.out.println("线程池执行任务1 : 10 / 0"); return 10 / 0; }, service) .whenComplete((result, exception)-> { // 得到异常信息,但是不能修改返回的数据 System.out.println("获取任务1的结果:" + result); System.out.println("获取任务1的异常:" + exception); }) .exceptionally((exception)-> { System.out.println("获取任务1的异常,并提供一个默认返回值" + exception); return 100; }) .handle((result,exception)->{ System.out.println("获取任务1的结果:" + result); // 可对结果做最后的处理(可处理异常),可改变返回值 if (exception != null) { System.out.println("获取任务1的异常:" + exception); } System.out.println("异常不会传播,前面调用exceptionally方法处理了异常"); return result == null ? 0 : result*2; }); System.out.println("main end.... result: " + future.get()); }
thenRun:继续执行,不接受上一个任务的返回结果
thenAccept:继续执行,接受上一个任务的返回结果
thenApply:继续执行,感知上一任务的返回结果,并且自己的返回结果也被下一个任务所感知
public CompletableFuturecU> thenApply(Function super T,? extends U> fn) public Completab1eFuture thenApplyAsync(Function super T,? extends U> fn) public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor) public completionstage6.5、两任务组合–-都要完成thenAccept(Consumer super T> action); public completionStage thenAcceptAsync(Consumer super T> action); public CompletionStage thenAcceptAsync(Consumer super T> action,Executor executor); public Completionstage thenRun(Runnable action); public Completionstage thenRunAsync(Runnable action); public completionStage thenRunAsync(Runnable action,Executor executor); public static ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main start ...."); // CompletableFuture future = CompletableFuture.supplyAsync(()->{ // System.out.println("线程池执行任务1 : 1+1"); // return 1 + 1; // },service); CompletableFuture future = CompletableFuture .supplyAsync(()->{ System.out.println("线程池执行任务1 : 10 / 0"); return 10 / 0; }, service) .whenComplete((result, exception)-> { // 得到异常信息,但是不能修改返回的数据 System.out.println("获取任务1的结果:" + result); System.out.println("获取任务1的异常:" + exception); }) .exceptionally((exception)-> { System.out.println("获取任务1的异常,并提供一个默认返回值" + exception); return 100; }) .handle((result,exception)->{ System.out.println("获取任务1的结果:" + result); // 可对结果做最后的处理(可处理异常),可改变返回值 if (exception != null) { System.out.println("获取任务1的异常:" + exception); } System.out.println("异常不会传播,前面调用exceptionally方法处理了异常"); return result == null ? 0 : result*2; }) .thenApplyAsync(result -> result*2); System.out.println("main end.... result: " + future.get()); }
public CompletableFuturethenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn); public CompletableFuture thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn); public CompletableFuture thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn, Executor executor); public CompletableFuture thenAcceptBoth(CompletionStage extends U> other, BiConsumer super T, ? super U> action); public CompletableFuture thenAcceptBothAsync(CompletionStage extends U> other, BiConsumer super T, ? super U> action); public CompletableFuture thenAcceptBothAsync(CompletionStage extends U> other, BiConsumer super T, ? super U> action, Executor executor); public CompletableFuture runAfterBoth(CompletionStage> other, Runnable action); public CompletableFuture runAfterBothAsync(CompletionStage> other, Runnable action); public CompletableFuture runAfterBothAsync(CompletionStage> other, Runnable action, Executor executor);
两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有
返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,
处理该任务。
public static ExecutorService excutor = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main start ...."); CompletableFuture6.6、两任务组合-一个完成future1 = CompletableFuture.supplyAsync(()->{ System.out.println("任务1 start.."); System.out.println("任务1 end.."); return 1+1; }); CompletableFuture future2 = CompletableFuture.supplyAsync(()->{ System.out.println("任务2 start.."); System.out.println("任务2 end.."); return "hello"; }); CompletableFuture future3 = future1.thenCombineAsync(future2, (result1, result2) -> { return "任务3 :组合前两个任务的返回值返回 --" + result1 + "---" + result2; }, excutor); System.out.println("main end.... 返回值:" + future3.get()); }
1、applyToEither:Function 带参有返回值【能获取前面任务的结果,自己有返回结果】【成功的那个任务的结果】 2、acceptEither:Consumer 带参无返回值【能获取前面任务的结果,自己没有返回结果】 3、runAfterEither:Runnable 无参无返回值【不能获取前面任务的结果,自己也没有返回结果】 supplyAsync:Supplier:无参有返回值【不能获取前面任务的结果,自己有返回值】 public CompletableFuture applyToEither( CompletionStage extends T> other, Function super T, U> fn) { return orApplyStage(null, other, fn); } public CompletableFuture applyToEitherAsync( CompletionStage extends T> other, Function super T, U> fn) { return orApplyStage(asyncPool, other, fn); } public CompletableFuture applyToEitherAsync( CompletionStage extends T> other, Function super T, U> fn, Executor executor) { return orApplyStage(screenExecutor(executor), other, fn); } public CompletableFutureacceptEither( CompletionStage extends T> other, Consumer super T> action) { return orAcceptStage(null, other, action); } public CompletableFuture acceptEitherAsync( CompletionStage extends T> other, Consumer super T> action) { return orAcceptStage(asyncPool, other, action); } public CompletableFuture acceptEitherAsync( CompletionStage extends T> other, Consumer super T> action, Executor executor) { return orAcceptStage(screenExecutor(executor), other, action); } public CompletableFuture runAfterEither(CompletionStage> other, Runnable action) { return orRunStage(null, other, action); } public CompletableFuture runAfterEitherAsync(CompletionStage> other, Runnable action) { return orRunStage(asyncPool, other, action); } public CompletableFuture runAfterEitherAsync(CompletionStage> other, Runnable action, Executor executor) { return orRunStage(screenExecutor(executor), other, action); }
public static ExecutorService excutor = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main start ...."); CompletableFuture6.7、多任务组合【总结】
// 1、等待所有任务完成 public static CompletableFutureallOf(CompletableFuture>... cfs) { return andTree(cfs, 0, cfs.length - 1); } //2、只有一个任务完成 public static CompletableFuture
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)