《谷粒商城》开发记录 9:线程池和异步编排

《谷粒商城》开发记录 9:线程池和异步编排,第1张

一、线程池 1 线程池基础 1.1 Java内置线程池

工具类Executors中提供了4种内置的线程池:
● SingleThreadPool:单线程线程池。
● FixedThreadPool:定长线程池。
● CachedThreadPool:可缓存线程池。
● ScheduledThreadPool:定时调度线程池。

虽然在业务代码中建议使用自定义线程池,但在写一些测试代码的时候,使用这些内置的线程池还是很方便的。

创建内置线程池示例:
    ExecutorService executorService = Executors.newFixedThreadPool(10);

1.2 自定义线程池

线程池的7个参数:
● int corePoolSize,核心线程池大小。
● int maximumPoolSize,最大线程池大小。
● long keepAliveTime,活跃时间。
● TimeUnit unit,时间单位。
● BlockingQueue workQueue,阻塞队列。
● ThreadFactory threadFactory,线程工厂。
● RejectedExecutionHandler handler,拒绝执行处理程序。

怎么理解这7个参数?
从前有一个银行营业厅。  【线程池】
这个银行营业厅有5个办理业务的窗口。  【最大线程池大小为5】
平时的顾客不多,只有2个窗口开放。  【核心线程池大小为2】
营业厅内设有候客区,候客区有4个座位。  【阻塞队列大小为4】
营业厅营业时:
先来了2位顾客,他们一来就直接到常开的2个窗口办理业务。
此时又有顾客到来,他们一看没有开放的窗口了,就坐在候客区的座位上等待。
当候客区的4个位置坐满时,还有新的顾客到来,营业厅赶紧开放其他窗口办理业务。
直到5个窗口全部开放,候客区的4个位置坐满时,营业厅不再放新的顾客进来。  【拒绝执行处理程序】
业务繁忙的时段过去后,有个窗口有1个小时都没有办理过业务了,营业厅就把这个窗口关掉了。  【活跃时间为1,时间单位为小时】
上文中唯一没有提到的线程工厂,是用来创建线程的。

线程池的拒绝策略有哪些?
● AbortPolicy:中止策略,抛出异常。
● CallerRunsPolicy:调用者运行策略。
● DiscardPolicy:丢弃策略。
● DiscardOldestPolicy:丢弃最旧任务策略。

这些参数一般怎么设置?
● 核心线程池大小:
   根据经验,假如服务器的CPU个数为N:
   对于CPU密集型的任务,将线程数设为N+1;
   对于IO密集型的任务,将线程数设为2N;
   对于计算和IO *** 作都比较多的任务,应考虑使用两个线程池,分别处理计算和IO *** 作。
   对于计算和IO *** 作都比较多且不可拆分的任务,采用算式 num=N×(任务总耗时/计算耗时) 来计算。
● 最大线程池大小:
   与核心线程池大小保持一致,减少任务处理过程中创建和销毁线程的开销。
● 活跃时间、时间单位:
   因为核心线程池和最大线程池大小保持一致,所以设多少都可以。
● 阻塞队列大小:
   必须是有界队列。
● 线程工厂:
   使用默认的Executors.defaultThreadFactory()就可以。
● 拒绝策略:
   一般使用默认的AbortPolicy就可以。
   对于不容许任务失败的场景,使用CallerRunsPolicy。
   对于无关紧要的任务,处理异常的收益很低,可以使用DiscardPolicy。
   对于时效性比较强的任务,比如发布消息,可以使用DiscardOldestPolicy。

2 线程池中体现的设计模式

首先,线程池技术是一种池化技术,池化技术体现了享元模式。
享元模式通常与工厂模式搭配使用,线程池参数中的线程工厂正是工厂模式的体现。
线程池参数中的另一个参数——拒绝策略体现了策略模式。

3 线程池的实际应用

1. 创建一个线程池注入到容器中。
    @Configuration
    public class ThreadPoolConfig {
        @Bean
        public ThreadPoolExecutor threadPoolExecutor() {
            return new ThreadPoolExecutor(10, 100,
                    300, TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(10000),
                    Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        }
    }
2. 在需要使用线程池的类中使用@Autowired 注解获取线程池。
    @Autowired
    ThreadPoolExecutor threadPoolExecutor;

为了使线程池用起来更方便,我们将线程池的各个参数放到配置文件中。
之所以这么做,是因为在Nacos配置中心可以添加配置集覆盖本地配置,这样就可以在网页上直接修改线程池参数。
1. 使用Maven引入Spring Boot配置处理器。
   
        org.springframework.boot
        spring-boot-configuration-processor
        true
   

2. 创建线程池配置属性类。
    @ConfigurationProperties(prefix = "gulimall.threadpool")
    @Component
    @Data
    public class ThreadPoolConfigProperties {
        private Integer corePoolSize;
        private Integer maximumPoolSize;
        private Integer keepAliveTime;
        private Integer workQueueSize;
    }
3. 在配置文件application.properties中进行配置。
    gulimall.threadpool.core-pool-size=10
    gulimall.threadpool.maximum-pool-size=100
    gulimall.threadpool.keep-alive-time=300
    gulimall.threadpool.work-queue-size=10000
4. 启用配置的线程池改造如下:
    @EnableConfigurationProperties(ThreadPoolConfigProperties.class)
    @Configuration
    public class ThreadPoolConfig {
        @Bean
        public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties properties) {
            return new ThreadPoolExecutor(properties.getCorePoolSize(),                     properties.getMaximumPoolSize(),
                    properties.getKeepAliveTime(), TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(properties.getWorkQueueSize()),
                    Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        }
    }

二、异步编排

Java 8 中提供了一个CompletableFuture类,可以帮助我们简化异步编程的复杂性,同时提供了函数式编程的能力,可以通过回调的方式处理计算结果。

1 创建异步对象

runAsync方法:没有返回值。
● public static CompletableFuture runAsync(Runnable runnable);
● public static CompletableFuture runAsync(Runnable runnable, Executor executor);
supplyAsync方法:有返回值。
● public static CompletableFuture supplyAsync(Supplier supplier);
● public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
如果不传入线程池,就用默认的线程池。

2 串行化方法

thenRun方法:上个任务完成后,执行当前任务。
● public CompletionStage thenRun(Runnable action);
● public CompletionStage thenRunAsync(Runnable action);
● public CompletionStage thenRunAsync(Runnable action, Executor executor);
thenAccept方法:上个任务完成后,执行当前任务。消费上个任务的返回结果,当前任务无返回值。
● public CompletionStage thenAccept(Consumer action);
● public CompletionStage thenAcceptAsync(Consumer action);
● public CompletionStage thenAcceptAsync(Consumer action, Executor executor);
thenApply方法:上个任务完成后,执行当前任务。消费上个任务的返回结果,并返回当前任务的结果。
● public CompletableFuture thenApply(Function fn);
● public CompletableFuture thenApplyAsync(Function fn);
● public CompletableFuture thenApplyAsync(Function fn, Executor executor);

3 多任务组合

allOf方法:等待所有任务完成。
● public static CompletableFuture allOf(CompletableFuture... cfs);
anyOf方法:等待任一个任务完成。
● public static CompletableFuture anyOf(CompletableFuture... cfs);

三、使用异步优化业务逻辑

查询商品详情的业务逻辑:
1. 获取SKU基本信息。
2. 获取SKU商品图。
3. 获取SPU的介绍。
4. 获取SPU销售属性。
5. 获取SPU规格参数。

如果要使用异步对查询商品详情的逻辑进行优化,首先要分析各个步骤之间是否存在先后顺序。
经过分析,第3、4、5三个步骤依赖第1步的返回结果。

使用异步优化业务逻辑后的代码:
    @Autowired
    ThreadPoolExecutor threadPoolExecutor;
    public SkuInfoVo getProductDetail(Long skuId) {
        SkuInfoVo skuInfoVo = new SkuInfoVo();
        // 1 获取SKU基本信息
        CompletableFuture skuInfoFuture = CompletableFuture.supplyAsync(() -> {
            SkuInfoEntity skuInfoEntity = getSkuInfo(skuId); // TODO getSkuInfo(skuId)
            skuInfoVo.setSkuInfo(skuInfoEntity);
            return skuInfoEntity;
        }, threadPoolExecutor);
        // 3 获取SPU的介绍
        CompletableFuture spuDescFuture = skuInfoFuture.thenAcceptAsync((skuInfoEntity) -> {
            skuInfoVo.setSpuInfoDesc(getSpuInfoDesc(skuInfoEntity.getSpuId())); // TODO getSpuInfoDesc(spuId)
        }, threadPoolExecutor);
        // 4 获取SPU销售属性
        CompletableFuture spuSaleAttrFuture = skuInfoFuture.thenAcceptAsync((skuInfoEntity) -> {
            skuInfoVo.setSpuSaleAttrList(getSpuSaleAttrList(skuInfoEntity.getSpuId())); // TODO getSpuSaleAttrList(spuId)
        }, threadPoolExecutor);
        // 5 获取SPU规格参数
        CompletableFuture spuBaseAttrFuture = skuInfoFuture.thenAcceptAsync((skuInfoEntity) -> {
            skuInfoVo.setSpuBaseAttrList(getSpuBaseAttrList(skuInfoEntity.getSpuId(), skuInfoEntity.getCatelogId())); // TODO getSpuBaseAttrList(spuId, catelogId)
        }, threadPoolExecutor);
        // 2 获取SKU商品图
        CompletableFuture skuImageFuture = CompletableFuture.runAsync(() -> {
            skuInfoVo.setSkuImageList(getSkuImageList(skuId)); // TODO getSkuImageList(skuId)
        }, threadPoolExecutor);

        try {
            CompletableFuture.allOf(spuDescFuture, spuSaleAttrFuture, spuBaseAttrFuture, skuImageFuture).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return skuInfoVo;
    }

四、其他内容

可以发现,在使用@Autowired 注解从容器中获取bean的时候,是根据数据类型来取的。那如果有多个同类型的bean被注入到容器中呢?

可以使用@Qualifier 注解和@Autowired 注解搭配使用,指定bean具体的实现类。
如:
    @Autowired
    @Qualifier("userServiceImpl")
    private UserService userService;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/921750.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)