[TOC]
线程池的概念这里不多说,在讲它的原理前,我们先自己想一下,如果我来写,那如何实现一个线程池?
首先要定义一个存放所有线程的集合;
另外,每有一个任务分配给线程池,我们就从线程池中分配一个线程处理它。但当线程池中的线程都在运行状态,没有空闲线程时,我们还需要一个队列来存储提交给线程池的任务。
初始化一个线程池时,要指定这个线程池的大小;另外,我们还需要一个变量来保存已经运行的线程数目。
线程池执行任务
接下来就是线程池的核心方法,每当向线程池提交一个任务时。如果 已经运行的线程<线程池大小,则创建一个线程运行任务,并把这个线程放入线程池;否则将任务放入缓冲队列中。
到这里,一个线程池已经实现的差不多了,我们还有最后一个难点要解决:从任务队列中取出任务,分配给线程池中“空闲”的线程完成。
分配任务给线程的第一种思路
很容易想到一种解决思路:额外开启一个线程,时刻监控线程池的线程空余情况,一旦有线程空余,则马上从任务队列取出任务,交付给空余线程完成。
这种思路理解起来很容易,但仔细思考,实现起来很麻烦(1. 如何检测到线程池中的空闲线程 2. 如何将任务交付给一个.start()运行状态中的空闲线程)。而且使线程池的架构变的更复杂和不优雅。
分配任务给线程的第二种思路
现在我们来讲第二种解决思路:线程池中的所有线程一直都是运行状态的,线程的空闲只是代表此刻它没有在执行任务而已;我们可以让运行中的线程,一旦没有执行任务时,就自己从队列中取任务来执行。
为了达到这种效果,我们要重写run方法,所以要写一个自定义Thread类,然后让线程池都放这个自定义线程类
现在我们来总结一下,这个自定义线程池的整个工作过程:
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。”ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable>workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数:
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
RejectedExecutionHandler接口
无法由 ThreadPoolExecutor 执行的任务的处理程序。
四个子类:
1.ThreadPoolExecutor.AbortPolicy --->用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException.
源码如下:
/**
* A handler for rejected tasks that throws a
* <tt>RejectedExecutionException</tt>.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an <tt>AbortPolicy</tt>.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException()
}
}
2.ThreadPoolExecutor.CallerRunsPolicy --->用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。
源码如下:
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the <tt>execute</tt>method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>CallerRunsPolicy</tt>.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run()
}
}
}
3.ThreadPoolExecutor.DiscardOldestPolicy --->用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。
源码如下:
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries <tt>execute</tt>, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>DiscardOldestPolicy</tt>for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll()
e.execute(r)
}
}
}
4.ThreadPoolExecutor.DiscardPolicy --->用于被拒绝任务的处理程序,默认情况下它将放弃被拒绝的任务。
源码如下:
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>DiscardPolicy</tt>.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
1.如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
2.如果此时线程池中的数量等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
3.如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
4.如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,
那么通过handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,
如果三者都满了,使用handler处理被拒绝的任务。
5.当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)