线程池使用

线程池使用,第1张

线程池使用

 

如上阿里巴巴的开发手册明确强制规定不让通过Executors来创建,使用Executors来创建要注意潜在宕机风险:

1:FixedThreadPool和SingleThreadPoolPool : 允许的请求队列长度为 Integer.MAX_VALUE,可能因为无限制任务队列而耗尽资源。如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将被累积起来可能会堆积大量的请求,从而导致 OOM(内存溢出).

解决方法:使用有界队列可以防止资源耗尽,但也因此必须要考虑饱和策略。因为默认的中止策略可能不是我们想要的

2 CachedThreadPool和ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM.

解决方法:这种情况可以采用ThreadPoolExecutor固定大小的线程池来解决这个问题

@Configuration
@EnableAsync
public class AsyncConfig {

    private int executor_corePoolSize = 10;
    private int synTaskExecutor_maxPoolSize = 30;
    private int synTaskExecutor_corePoolSize_queueCapacity = 500;
    private String synTaskExecutor_threadNamePrefix = "zss-async-executor-";


    @Bean(name = "payExecutor")
    public AsyncTaskExecutor baseUserInfoSynTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(executor_corePoolSize);
        // 设置最大线程数
        executor.setMaxPoolSize(synTaskExecutor_maxPoolSize);
        // 设置队列容量
        executor.setQueueCapacity(synTaskExecutor_corePoolSize_queueCapacity);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置默认线程名称
        executor.setThreadNamePrefix(synTaskExecutor_threadNamePrefix);
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteonShutdown(true);
        return executor;
    }


}

 调用方法解析:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
  • corePoolSize: 核心线程数,线程池维护的最小数量
  • maximumPoolSize:线程池维护最大线程数
  • keepAliveTime: 线程空闲时间,核心线程数之外线程的最大空闲时间
  • unit: 线程空闲时间单位
  • workQueue:缓冲队列|任务队列
  • handler: 拒绝任务的处理策略

增长策略: 默认线程池接收到任务,创建一个线程去执行当前任务,当线程数大于核心线程数,会将任务添加到任务队列中,当队列满了,会创建新的线程去执行任务。当线程数大于最大线程数停止。并启动拒绝策略

回收策略:线程池中线程的数量大于核心线程数量&&有空闲线程&&空闲线程的空闲时间大于了KeepAliveTime时,会对空闲线程进行回收,直到等于核心线程为止。

拒绝策略:如果线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务

拒绝策略:

1:AbortPolicy:直接丢弃,并抛出异常RejectedExecutionException 

  • 拒绝策略:抛出运行时异常RejectedExecutionException。
  • 这种策略丢弃任务,并抛出异常

2:DiscardPolicy: 直接丢弃,不做处理

  • 拒绝策略:不能执行的任务将被丢弃。
  • 这种策略什么都没做。
  • 丢弃当前将要加入队列的任务本身

3:DiscardOldestPolicy: 丢弃任务队列中最早的任务,然后尝试执行即将加入的新任务

  • 拒绝策略:在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务(工作队列头部),然后重新尝试运行该任务。
  • 丢弃任务队列中最旧任务

4:CallerRunsPolicy:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,线程池中,已经没有任何资源可以利用了,则调用该任务的execute本身直接执行

  • 拒绝策略:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  • 不进入线程池执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  if (!e.isShutdown()) {
      r.run();  
  }
}

任务队列:

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入 *** 作还是读取 *** 作,都需要获取到锁才能进行 *** 作。



SynchronousQueue

它是一个特殊的队列,它的名字其实就蕴含了它的特征 – - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入 *** 作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读 *** 作的时候,同样需要一个相匹配的写线程的写 *** 作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。



linkedBlockingDeque

linkedBlockingDeque就是一个双向队列,任何一端都可以进行元素的出入。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。



linkedBlockingQueue

linkedBlockingQueue是一个单向队列,只能一端出一端入的单向队列结构,是有FIFO特性的,并且是通过两个ReentrantLock和两个Condition来实现的。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。



DelayQueue

是一个支持延时获取元素的无界阻塞队列。内部用 PriorityQueue 实现。



linkedTransferQueue

PriorityBlockingQueue

PriorityBlockingQueue是带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,linkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入 *** 作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)

总结:

  • ArrayBlockingQueue:先进先出队列,创建时指定大小, 有界;
  • linkedBlockingQueue:使用链表实现的先进先出队列,默认大小为Integer.MAX_VALUE;
  • SynchronousQueue:不保存提交的任务, 数据也不会缓存到队列中, 用于生产者和消费者互等对方, 一起离开.
  • PriorityBlockingQueue: 支持优先级的队列

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5479988.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存