如上阿里巴巴的开发手册明确强制规定不让通过Executors来创建,使用Executors来创建要注意潜在宕机风险:
1:FixedThreadPool和SingleThreadPoolPool : 允许的请求队列长度为 Integer.MAX_VALUE,可能因为无限制任务队列而耗尽资源。如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将被累积起来可能会堆积大量的请求,从而导致 OOM(内存溢出).
解决方法:使用有界队列可以防止资源耗尽,但也因此必须要考虑饱和策略。因为默认的中止策略可能不是我们想要的
2 CachedThreadPool和ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM.
解决方法:这种情况可以采用ThreadPoolExecutor固定大小的线程池来解决这个问题
@Configuration @EnableAsync public class AsyncConfig { private int executor_corePoolSize = 10; private int synTaskExecutor_maxPoolSize = 30; private int synTaskExecutor_corePoolSize_queueCapacity = 500; private String synTaskExecutor_threadNamePrefix = "zss-async-executor-"; @Bean(name = "payExecutor") public AsyncTaskExecutor baseUserInfoSynTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(executor_corePoolSize); // 设置最大线程数 executor.setMaxPoolSize(synTaskExecutor_maxPoolSize); // 设置队列容量 executor.setQueueCapacity(synTaskExecutor_corePoolSize_queueCapacity); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(60); // 设置默认线程名称 executor.setThreadNamePrefix(synTaskExecutor_threadNamePrefix); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteonShutdown(true); return executor; } }
调用方法解析:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
- corePoolSize: 核心线程数,线程池维护的最小数量
- maximumPoolSize:线程池维护最大线程数
- keepAliveTime: 线程空闲时间,核心线程数之外线程的最大空闲时间
- unit: 线程空闲时间单位
- workQueue:缓冲队列|任务队列
- handler: 拒绝任务的处理策略
增长策略: 默认线程池接收到任务,创建一个线程去执行当前任务,当线程数大于核心线程数,会将任务添加到任务队列中,当队列满了,会创建新的线程去执行任务。当线程数大于最大线程数停止。并启动拒绝策略
回收策略:线程池中线程的数量大于核心线程数量&&有空闲线程&&空闲线程的空闲时间大于了KeepAliveTime时,会对空闲线程进行回收,直到等于核心线程为止。
拒绝策略:如果线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务
拒绝策略:
1:AbortPolicy:直接丢弃,并抛出异常RejectedExecutionException
- 拒绝策略:抛出运行时异常RejectedExecutionException。
- 这种策略丢弃任务,并抛出异常
2:DiscardPolicy: 直接丢弃,不做处理
- 拒绝策略:不能执行的任务将被丢弃。
- 这种策略什么都没做。
- 丢弃当前将要加入队列的任务本身
3:DiscardOldestPolicy: 丢弃任务队列中最早的任务,然后尝试执行即将加入的新任务
- 拒绝策略:在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务(工作队列头部),然后重新尝试运行该任务。
- 丢弃任务队列中最旧任务
4:CallerRunsPolicy:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,线程池中,已经没有任何资源可以利用了,则调用该任务的execute本身直接执行
- 拒绝策略:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
- 不进入线程池执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
任务队列:
ArrayBlockingQueueArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入 *** 作还是读取 *** 作,都需要获取到锁才能进行 *** 作。
SynchronousQueue
它是一个特殊的队列,它的名字其实就蕴含了它的特征 – - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入 *** 作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读 *** 作的时候,同样需要一个相匹配的写线程的写 *** 作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。
linkedBlockingDeque
linkedBlockingDeque就是一个双向队列,任何一端都可以进行元素的出入。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
linkedBlockingQueue
linkedBlockingQueue是一个单向队列,只能一端出一端入的单向队列结构,是有FIFO特性的,并且是通过两个ReentrantLock和两个Condition来实现的。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
DelayQueue
是一个支持延时获取元素的无界阻塞队列。内部用 PriorityQueue 实现。
linkedTransferQueue
PriorityBlockingQueue
PriorityBlockingQueue是带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,linkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入 *** 作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)
总结:
- ArrayBlockingQueue:先进先出队列,创建时指定大小, 有界;
- linkedBlockingQueue:使用链表实现的先进先出队列,默认大小为Integer.MAX_VALUE;
- SynchronousQueue:不保存提交的任务, 数据也不会缓存到队列中, 用于生产者和消费者互等对方, 一起离开.
- PriorityBlockingQueue: 支持优先级的队列
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)