Worker 实现了AQS,用的是非公平锁,独占锁(不可重入)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
// 对应的线程
final Thread thread;
// 在提交execute时的那个任务
Runnable firstTask;
// 该线程之前已处理的任务数
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
ThreadPoolExecutor worker为何要实现AQS ?
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
- Worker执行任务时获得锁,执行完毕释放锁。
- Worker具有不可重入特性,目的是为了防止worker刚好在运行途中,线程池控制类 *** 作(比如setCorePoolSize)时获得锁,这样的话,因为重入性,setCorePoolSize会执行中断 *** 作,会把正在运行的任务中断掉。在空闲时可以响应中断,在执行任务时不可被中断
比如进行shutdown()优雅停机的时候,要进行w.tryLock方法,没有获取到锁,说明正在运行或者干其他事情,是不会被其他事情打断掉的。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
动态线程池怎么实现
动态线程池可以修改的参数有:
- 核心线程数corePoolSize
- 最大线程数maximumPoolSize
- 阻塞队列BlockingQueue workQueue
private static ThreadPoolExecutor buildThreadPoolExecutor() {
return new ThreadPoolExecutor(2, 5, 1,
TimeUnit.SECONDS,
new ResizableCapacityLinkedBlockIngQueue<>(10),
new NamedThreadFactory("dynamic"));
}
动态修改核心线程数setCorePoolSize()
public void setCorePoolSize(int corePoolSize) {
//如果核心小于0抛出异常
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
//现有工作线程大于核心线程
if (workerCountOf(ctl.get()) > corePoolSize)
//为了打断空等阻塞任务的线程。让他们进行回收,超过原本核心线程数的才会用有限时间等待
interruptIdleWorkers();
else if (delta > 0) {
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
- 如果重新设置的核心线程数比原来的小:使用线程interrputWorker 方法进行回收,就算收到比新设置的核心数还要小,也无所谓。
- 如果重新设置的核心线程数比原来的大:根据队列剩余任务的大小和新老核心线程数差距大小去添加新的线程
核心线程数默认是不会被回收的,如果需要回收核心线程数,需要调用下面的方法
//允许核心线程池超时之后回收。
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
//能回收
if (value)
interruptIdleWorkers();
}
}
获取Task任务的方法: private Runnable getTask()
//设置了allowCoreThreadTimeOut 位true ,timed 一定为true 了,导致所有获取任务都是有限时间获取,获取不到就一定返回null。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//不超过核心线程数的时候,拉取任务的时候也设置超时等待
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
动态修改最大线程数setMaximumPoolSize()
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
//如果工作线程已经大于最大线程数,尝试进行回收一部分空闲的,阻塞在那里等待的。
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
改最小核心线程数的时候,记得顺便改最大核心线程数
这里会因wc > maximumPoolSize 不断减活动线程线程数,最大线程数上不去
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
阻塞队列的大小修改(有限队列),自己实现
- 修改之前校验参数
- 然后有多余空间就唤醒阻塞线程
public class ResizableCapacityLinkedBlockIngQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//容量
private volatile int capacity;
public int getCapacity() {
return capacity;
}
public void setCapacity(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
int delta = capacity - this.capacity;
//如果设置比原来的小capacity
if (delta < 0) {
//队列有空闲的位置,比要扣除的数量要大,就可以移除,不够就报错。
if (this.remainingCapacity() + delta >= 0) {
takeLock.lock();
try {
if (this.remainingCapacity() + delta >= 0) {
this.capacity = capacity;
}
} finally {
takeLock.unlock();
}
} else {
throw new IllegalArgumentException();
}
} else {
this.capacity = capacity;
if (this.remainingCapacity() > 0) {
//如果数组大小变大了,有多余的空间,需要唤醒等待入列的线程,让他们进入线程
notFull.signal();
}
}
}
}
动态线程池实现的demo
public class DynamicThreadPoolTest {
public static void main(String[] args) throws Exception {
dynamicModifyExecutor();
}
private static ThreadPoolExecutor buildThreadPoolExecutor() {
return new ThreadPoolExecutor(2, 5, 1,
TimeUnit.SECONDS,
new ResizableCapacityLinkedBlockIngQueue<>(10),
new NamedThreadFactory("dynamic"));
}
public static void dynamicModifyExecutor() throws InterruptedException {
ThreadPoolExecutor executor = buildThreadPoolExecutor();
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
executor.submit(() -> {
threadPoolStatus(executor, "创建任务");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPoolStatus(executor, "改变之前");
executor.setCorePoolSize(10);
executor.setMaximumPoolSize(10);
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(30);
threadPoolStatus(executor, "改变之后");
// Thread.sleep(10000);
// threadPoolStatus(executor, "结束之后");
// System.out.println(executor.toString());
Thread.currentThread().join();
}
private static void threadPoolStatus(ThreadPoolExecutor executor, String name) {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
System.out.println(Thread.currentThread().getName() + "-" + name + "-:" +
"核心线程数:" + executor.getCorePoolSize() +
" 活动线程数" + executor.getActiveCount() +
" 最大线程数" + executor.getMaximumPoolSize() +
" 线程池活跃度" + divide(executor.getActiveCount(), executor.getMaximumPoolSize()) +
" 任务完成数" + executor.getCompletedTaskCount() +
" 队列大小" + (executor.getQueue().remainingCapacity() + executor.getQueue().size()) +
" 当前排队队列线程数" + executor.getQueue().size() +
" 队列剩余大小" + executor.getQueue().remainingCapacity() +
" 队列使用度" + divide(executor.getQueue().size(), executor.getQueue().remainingCapacity() + executor.getQueue().size())
);
}
private static String divide(int num1, int num2) {
return String.format("%1.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble("" + num2) * 100);
}
}
问题:线程池被创建后里面有线程吗?如果没有的话,你知道有什么方法对线程池进行预热吗?
线程池被创建后如果没有任务过来,里面是不会有线程的。如果需要预热的话可以调用下面的两个方法:
- 启动一个:
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
- 启动全部:(启动最小核心数)
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))//大于核心线程数就停止了
++n;
return n;
}
参考资料:
- Java线程池实现原理及其在美团业务中的实践
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)