线程池和异步

线程池和异步,第1张

线程池和异步 1、初始化线程的4种方式

1、实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】
2、当前系统中线程池只有一两个,每个异步任务提交给线程池让他自己去执行

1)、继承Thread
2)、实现 Runnable接口
3)、实现 Callable接口+FutureTask(可以拿到返回结果,可以处理异常)

 FutureTask futureTask = new FutureTask(new MyCallable());
        Thread thread = new Thread(futureTask);
        thread.start();
        //阻塞等待这个线程执行完,获取返回结果
        Integer o = (Integer) futureTask.get();

4)、线程池

区别;
1、2不能得到返回值。3可以获取返回值
1、2、3都不能控制资源
4可以控制资源,性能稳定,不会一下子所有线程一起运行

2、线程池execute和submit区别

execute:参数只能是Runnable,没有返回值

submit:参数可以是Runnable、Callable,返回值是FutureTask

3、创建线程池的方式

1、创建一个固定类型的线程池
Executors.newFixedThreadPool(10);

2、直接创建,7个参数

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize:核心线程数,一直存在,一开始只是new 并没有start
maximumPoolSize:最大线程数量,控制资源
keepAliveTime:存活时间 【释放空闲线程(maximumPoolSize-corePoolSize) 超过存活时间释放线程】
unit:时间单位
workQueue: 阻塞队列,只要有线程空闲,就会去队列取出新的任务执行
threadFactory:线程的创建工厂【可以自定义】
RejectedExecutionHandler handler:拒绝策略

3、顺序:
1、先创建核心线程运行任务
2、核心线程满了,放入阻塞队列,空闲的core会自己去阻塞队列获取
new linkedBlockingDeque()默认是Integer的最大值,
3、阻塞队列满了继续创建线程,最多创建maximumPoolSize个
4、如果传入了拒绝策略会执行,否则抛出异常
5、拒绝策略:
1、丢弃最老的 Rejected
2、调用者同步调用,直接调用run方法,不创建线程了 Caller
3、直接丢弃新任务 Abort 【默认使用这个】
4、丢弃新任务,并且抛出异常 Discard

4、常见的4种线程池

1、CachedThreadPool:核心线程数是0,如果空闲会回收所有线程【缓存线程池】
2、FixedThreadPool:核心线程数 = 最大线程数,【不回收】
3、ScheduledThreadPool:定时任务线程池,多久之后执行【可提交核心线程数,最大线程数是Integer.Max】
4、SingleThreadPool:核心与最大都只有一个【不回收】,后台从队列中获取任务

5、使用线程池的好处

1、降低资源的消耗【减少创建销毁线程的开销】
通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗

2、提高响应速度【控制线程个数】
因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行

3、提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【短信等】,关闭非核心线程池释放内存资源】
线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

6、CompletableFuture异步编排 6.1、创建异步对象提交任务runAsync、supplyAsync
CompletableFuture提供了四个静态方法来创建一个异步 *** 作。

1)public static Completab1eFuture runAsync(Runnable runnable)
2)public static completableFuturecVoid> runAsync(Runnable runnable,Executor executor)
3)public static  CompletableFuture supplyAsync(Suppliersupplier)
4)public static  CompletableFuturecU> supplyAsync(Supplier supplier,Executor
executor)

1、runXXX没有返回结果,supplyXxx有返回结果
2、可以传入自定义线程池,否则使用默认线程池

public class ThreadTest {
    public static ExecutorService excutor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ....");
        CompletableFuture future = CompletableFuture.supplyAsync(()->{
            System.out.println("线程池执行任务1 : 1+1");
            return 1 + 1;
        },service);
        System.out.println("main end.... result: " + future.get());

    }
}
6.2、whencomplete接收任务返回值
public completableFuture whencomplete(BiConsumer action);
public CompletableFuturewhenCompleteAsync(BiConsumer 
action);
public completableFuture whenCompleteAsync(BiConsumer
action,Executor executor);
public completableFutureexceptionally(Function fn);

whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete和whenCompleteAsync的区别:
whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务。
whenCompleteAsync:是执行把 whenCompleteAsync这个任务继续提交给线程池来进行执行。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ....");
        CompletableFuture future = CompletableFuture
                .supplyAsync(()->{
                    System.out.println("线程池执行任务1 : 10 / 0");
                    return 10 / 0;
                }, service)
                .whenComplete((result, exception)-> {
                    // 得到异常信息,但是不能修改返回的数据
                    System.out.println("获取任务1的结果:" + result);
                    System.out.println("获取任务1的异常:" + exception);
                })
                .exceptionally((exception)-> {
                    System.out.println("获取任务1的异常,并提供一个默认返回值" + exception);
                    return 100;
                });
        System.out.println("main end.... result: " + future.get());
        }
6.3、handle方法【对返回值进行加工,再返回】
public  completionStage handle(BiFunction fn);
public  completionStagehandleAsync(BiFunction
fn);
public > CompletionStage handleAsync(BiFunction
fn,Executor executor ) ;
和complete一样,可对结果做最后的处理(可处理异常),可改变返回值。

 public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ....");
//        CompletableFuture future = CompletableFuture.supplyAsync(()->{
//            System.out.println("线程池执行任务1 : 1+1");
//            return 1 + 1;
//        },service);
        CompletableFuture future = CompletableFuture
                .supplyAsync(()->{
                    System.out.println("线程池执行任务1 : 10 / 0");
                    return 10 / 0;
                }, service)
                .whenComplete((result, exception)-> {
                    // 得到异常信息,但是不能修改返回的数据
                    System.out.println("获取任务1的结果:" + result);
                    System.out.println("获取任务1的异常:" + exception);
                })
                .exceptionally((exception)-> {
                    System.out.println("获取任务1的异常,并提供一个默认返回值" + exception);
                    return 100;
                })
                .handle((result,exception)->{
                    System.out.println("获取任务1的结果:" + result);
                    // 可对结果做最后的处理(可处理异常),可改变返回值
                    if (exception != null) {
                        System.out.println("获取任务1的异常:" + exception);
                    }
                    System.out.println("异常不会传播,前面调用exceptionally方法处理了异常");
                    return result == null ? 0 : result*2;
                });
        System.out.println("main end.... result: " + future.get());

    }

6.4、线程串行化方法【then】

thenRun:继续执行,不接受上一个任务的返回结果
thenAccept:继续执行,接受上一个任务的返回结果
thenApply:继续执行,感知上一任务的返回结果,并且自己的返回结果也被下一个任务所感知

public  CompletableFuturecU> thenApply(Function fn)
public  Completab1eFuture thenApplyAsync(Function fn)
public  CompletableFuture thenApplyAsync(Function fn,
Executor executor)
public completionstage thenAccept(Consumer action);
public completionStage thenAcceptAsync(Consumer action);
public CompletionStage thenAcceptAsync(Consumer action,Executor
executor);
public Completionstage thenRun(Runnable action);
public Completionstage thenRunAsync(Runnable action);
public completionStage thenRunAsync(Runnable action,Executor executor);

    public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ....");
//        CompletableFuture future = CompletableFuture.supplyAsync(()->{
//            System.out.println("线程池执行任务1 : 1+1");
//            return 1 + 1;
//        },service);
        CompletableFuture future = CompletableFuture
                .supplyAsync(()->{
                    System.out.println("线程池执行任务1 : 10 / 0");
                    return 10 / 0;
                }, service)
                .whenComplete((result, exception)-> {
                    // 得到异常信息,但是不能修改返回的数据
                    System.out.println("获取任务1的结果:" + result);
                    System.out.println("获取任务1的异常:" + exception);
                })
                .exceptionally((exception)-> {
                    System.out.println("获取任务1的异常,并提供一个默认返回值" + exception);
                    return 100;
                })
                .handle((result,exception)->{
                    System.out.println("获取任务1的结果:" + result);
                    // 可对结果做最后的处理(可处理异常),可改变返回值
                    if (exception != null) {
                        System.out.println("获取任务1的异常:" + exception);
                    }
                    System.out.println("异常不会传播,前面调用exceptionally方法处理了异常");
                    return result == null ? 0 : result*2;
                })
                .thenApplyAsync(result -> result*2);
        System.out.println("main end.... result: " + future.get());

    }
6.5、两任务组合–-都要完成
public  CompletableFuture thenCombine(CompletionStage other, BiFunction fn);
public  CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn);
public  CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor);


public  CompletableFuture thenAcceptBoth(CompletionStage other, BiConsumer action);
public  CompletableFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action);
public  CompletableFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor);


public CompletableFuture runAfterBoth(CompletionStage other, Runnable action);
public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action);
public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor);

两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有
返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,
处理该任务。

public static ExecutorService excutor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ....");
        CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
            System.out.println("任务1 start..");
            System.out.println("任务1 end..");
            return 1+1;
        });

        CompletableFuture future2 = CompletableFuture.supplyAsync(()->{
            System.out.println("任务2 start..");
            System.out.println("任务2 end..");
            return "hello";
        });

        CompletableFuture future3 = future1.thenCombineAsync(future2, (result1, result2) -> {
            return "任务3 :组合前两个任务的返回值返回 --" + result1 + "---" + result2;
        }, excutor);
        System.out.println("main end.... 返回值:" + future3.get());
    }
6.6、两任务组合-一个完成
1、applyToEither:Function 带参有返回值【能获取前面任务的结果,自己有返回结果】【成功的那个任务的结果】
2、acceptEither:Consumer 带参无返回值【能获取前面任务的结果,自己没有返回结果】
3、runAfterEither:Runnable 无参无返回值【不能获取前面任务的结果,自己也没有返回结果】
	supplyAsync:Supplier:无参有返回值【不能获取前面任务的结果,自己有返回值】
public  CompletableFuture applyToEither(
    CompletionStage other, Function fn) {
    return orApplyStage(null, other, fn);
}

public  CompletableFuture applyToEitherAsync(
    CompletionStage other, Function fn) {
    return orApplyStage(asyncPool, other, fn);
}

public  CompletableFuture applyToEitherAsync(
    CompletionStage other, Function fn,
    Executor executor) {
    return orApplyStage(screenExecutor(executor), other, fn);
}

public CompletableFuture acceptEither(
    CompletionStage other, Consumer action) {
    return orAcceptStage(null, other, action);
}

public CompletableFuture acceptEitherAsync(
    CompletionStage other, Consumer action) {
    return orAcceptStage(asyncPool, other, action);
}

public CompletableFuture acceptEitherAsync(
    CompletionStage other, Consumer action,
    Executor executor) {
    return orAcceptStage(screenExecutor(executor), other, action);
}

public CompletableFuture runAfterEither(CompletionStage other,
                                              Runnable action) {
    return orRunStage(null, other, action);
}

public CompletableFuture runAfterEitherAsync(CompletionStage other,
                                                   Runnable action) {
    return orRunStage(asyncPool, other, action);
}

public CompletableFuture runAfterEitherAsync(CompletionStage other,
                                                   Runnable action,
                                                   Executor executor) {
    return orRunStage(screenExecutor(executor), other, action);
}
public static ExecutorService excutor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ....");

        CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
            System.out.println("任务1 start..");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1 end..");
            return 1+1;
        });

        CompletableFuture future2 = CompletableFuture.supplyAsync(()->{
            System.out.println("任务2 start..");
            System.out.println("任务2 end..");
            return "hello";
        });

        // 两个任务都完成才执行任务3
//        CompletableFuture future3 = future1.thenCombineAsync(future2, (result1, result2) -> {
//            return "任务3 :组合前两个任务的返回值返回 --" + result1 + "---" + result2;
//        }, excutor);

        // 任一任务完成就可以执行任务3【返回值是future1的泛型】
        CompletableFuture future3 = future1.applyToEitherAsync(future2, (result) -> {
            return "任务3 :组合先执行完的任务的结果 --" + result;
        }, excutor);
        System.out.println("main end.... 返回值:" + future3.get());
    }
 
6.7、多任务组合【总结】 
// 1、等待所有任务完成
public static CompletableFuture allOf(CompletableFuture... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}
//2、只有一个任务完成
public static CompletableFuture anyOf(CompletableFuture... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}
					
										


					

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5687375.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)