[JDK源码]-J.U.C-ScheduledThreadPoolExecutor

[JDK源码]-J.U.C-ScheduledThreadPoolExecutor,第1张

[JDK源码]-J.U.C-ScheduledThreadPoolExecutor

以下是在学习中整理的一些内容,如有错误点,多谢指出。

ScheduledExecutorService

可以用来在给定延时后执行异步任务或者周期性执行任务,由于放入的任务不一定能够立即执行,所以还是需要得放入队列,然后获取,看看是否满足执行条件:时间是否满足。

ScheduledExecutorService接口
public interface ScheduledExecutorService extends ExecutorService {
	//在延迟delay时间后执行command。unit为时间单位。只调度一次
    public ScheduledFuture schedule(Runnable command,long delay, TimeUnit unit);
	//执行callable。
    public  ScheduledFuture schedule(Callable callable,long delay, TimeUnit unit);
	//基于 上一次开始时间 来延迟固定时间后执行下一次任务
    public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
	//基于 上一次结束时间
    public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
ScheduledThreadPoolExecutor核心变量
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    //在关闭时应该取消周期性任务
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;
	//如果在关闭时应该取消非周期性的任务
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
	//是否应该从队列中删除
    private volatile boolean removeonCancel = false;
	//顺序号, 保证FIFO
    private static final AtomicLong sequencer = new AtomicLong();

// ScheduledFutureTask类		用于封装Runnable Callable 对象
	private class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture {
		//序号
        private final long sequenceNumber;
		//以纳秒为单位,表明该任务下一次能够被调度的时间
        private long time;
		//重复任务的周期
        private final long period;
		//被 reExecutePeriodic 方法重新加入队列中的实际任务,默认当前任务
        RunnableScheduledFuture outerTask = this;
		//延迟队列的索引
        int heapIndex;
		//用于取消任务执行
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            //如果取消成功,则从任务队列里移除任务
            if (cancelled && removeonCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }
    	public void run() {
		    boolean periodic = isPeriodic();//是不是周期性执行任务
		    //判断是否能继续执行
		    if (!canRunInCurrentRunState(periodic))
		        cancel(false);
		    else if (!periodic)//不是周期性调度任务,直接调用run
		        ScheduledFutureTask.super.run(); //FutureTask的run方法
		    else if (ScheduledFutureTask.super.runAndReset()) {//是周期性调度任务runAndReset方法执行
		        setNextRunTime(); //设置下一次调度时间
		        reExecutePeriodic(outerTask);//通过这个进行调度
		    }
		}
     	private void setNextRunTime() {
		    long p = period;
		    if (p > 0)//p>0 任务开始执行的时间 +周期调度时间
		        time += p;
		    else
		        //如果period 小于0 ,以任务执行完毕后的的时间来计算下一次执行的时间
		        time = triggerTime(-p);
		}   
        //将任务重新放入任务队列中执行
		void reExecutePeriodic(RunnableScheduledFuture task) {
		    //根据当前线程池状态,判断当前任务是否允许被执行
		    if (canRunInCurrentRunState(true)) {
		        super.getQueue().add(task);	//将任务添加到延迟队列中
		        if (!canRunInCurrentRunState(true) && remove(task))//再次判断是否应该执行
		            task.cancel(false); //取消任务执行
		        else
		            //正常情况下,保证线程池中至少有一个工作线程在处理任务
		            ensurePrestart();
		    }
		}
	} 
//延时队列的实现原理:DelayedWorkQueue :因为都是周期性任务 带有时间的 对其排序 使用的是小根堆
    static class DelayedWorkQueue extends AbstractQueue implements BlockingQueue {
        //初始容量为 16
        private static final int INITIAL_CAPACITY = 16;
        //任务队列  数组
        private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
		//线程Leader
        private Thread leader = null;
        //条件变量用于工作线程等待执行任务
        private final Condition available = lock.newCondition(); 
    }
}
scheduleAtFixedRate实现

表示周期性任务调度,每次任务基于上一次任务开始执行的时间来决定下次启动时间

public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    //triggerTime 用于计算该任务应被调度的时间,unit.tonanos(period) 用于执行周期变为纳秒
    ScheduledFutureTask sft = new ScheduledFutureTask(command,null,triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
//封装成了 ScheduledFutureTask 对象,通过 decorateTask, 调用 delayedExecute 方法执行
protected  RunnableScheduledFuture decorateTask(
    Callable callable, RunnableScheduledFuture task) {
    return task;
}
delayedExecute方法
//主执行周期性调度或者延迟任务的方法。
private void delayedExecute(RunnableScheduledFuture task) {
    if (isShutdown())//SHURDOWN 了?
        reject(task);
    else {
        super.getQueue().add(task); // 添加到队列里去
        if (isShutdown() && 
            !canRunInCurrentRunState(task.isPeriodic()) &&//当前线程是不是在shutdown 的状态下执行
            remove(task))//为false的情况下 从当前队列移除  然后取消当前任务
            task.cancel(false);
        else
            //确保至少还有一个线程在执行任务
            ensurePrestart();
    }
}
ensurePrestart方法

如果线程池未关闭那么 ensurePrestart方法

//保证的是至少启动一个核心线程
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());//获取工作线程数
    if (wc < corePoolSize)//如果小于 核心线程数
        addWorker(null, true);//添加 核心线程数
    else if (wc == 0)//至少保证还有一个工作线程
        addWorker(null, false);
}
add添加

如果线程未关闭,那么直接调用 DelayedWorkQueue 里的 add -> offer 方法,将任务task 放入队列中

public boolean add(Runnable e) {
    return offer(e);
}
    public boolean offer(Runnable x) {//向数组中添加任务
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture e = (RunnableScheduledFuture)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {//要不要扩容
            int i = size;
            if (i >= queue.length)
                grow();
            size = i + 1;
            //之前没有任务,放在第一位就行
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
                //否则将任务放到最后一位,然后通过siftUp 方法调整
                siftUp(i, e);
            }
           //如果队列中的第一个任务是当前e则清除leader线程,然后唤醒一个等待队列可用的线程来执行任务 
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }
grow 扩容方法
private void grow() {
    int oldCapacity = queue.length;
    //每次扩容 50%
    int newCapacity = oldCapacity + (oldCapacity >> 1); 
    if (newCapacity < 0) //如果新容量小于0,那么表明溢出了
        newCapacity = Integer.MAX_VALUE;
    //将 old数组任务 复制到相信数组上
    queue = Arrays.copyOf(queue, newCapacity);
}
siftUp调整方法
//调整堆
private void siftUp(int k, RunnableScheduledFuture key) {
    //当K>0时,不断进行调整。k等于0表明调整到了根节点,也就是第一个元素,这时必须退出循环 
    while (k > 0) {
        // 找到他的 父亲节点   
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture e = queue[parent];
        //与父亲节点比较 看看需不需要动,直到找到 位置
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}
canRunInCurrentRunState

如果在添加到优先级队列后,线程池已经关闭,需要通过 来判断是否应该继续执行该任务

boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
}
take 方法
//从队列中获取任务,如果当前队列没有任务可取,则阻塞直到队列有任务,即等待offer方法唤醒
public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//可响应中断的方式加锁
    try {
        for (;;) {
            RunnableScheduledFuture first = queue[0];
            //如果队列第一个任务为null,则证明没有任务了,当前线程等待
            if (first == null)
                available.await();
            else {//否则获取第一个任务的剩余等待时间,判断是否小于0.需不需要执行
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // 在线程等待任务可执行时不保留引用
	  //由于任务还需要等待一段时间才能执行,这时看看前面有没有线程正在等待,如果有,则当前线程继续等待
                if (leader != null)//根本没有必要让拿到第一个任务的线程等待
                    available.await(); 
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        //将 leader 变量去掉
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
//完成最终的任务出队,这里传入的f 为第一个等待任务。由于任务被出队,因此需要调整堆 
private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
    //当前任务队列内任务数量 --,然后取出队列尾部的一个任务,调用siftDown重新调整堆的顺序
    int s = --size;
    RunnableScheduledFuture x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}
siftDown : 调整堆
//调整堆。将任务向堆尾移动。
private void siftDown(int k, RunnableScheduledFuture key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;	//获取左孩子节点索引
        RunnableScheduledFuture c = queue[child];
        int right = child + 1;		//右孩子
        if (right < size && c.compareTo(queue[right]) > 0)//比较左右孩子的大小
            c = queue[child = right];
        if (key.compareTo(c) <= 0)// 以左右孩子的min 来和 key比较
            break;
        queue[k] = c;		//如果key 小于 他们的min 那么交换
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;	//此时k 即为传入key 应该存放的下标
    setIndex(key, k);
}
scheduleWithFixedDelay实现

scheduleAtFixedRate 和 scheduleWithFixedDelay,前者是基于任务开始时间计算的 ,后者 是 基于上一个任务执行完成的时间计算的

public ScheduledFuture scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask sft =
        new ScheduledFutureTask(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
shutdown

ThreadPoolExecutor 的shutdown ->onShutdown

@Override void onShutdown() {
    //首先获取任务队列 q
    BlockingQueue q = super.getQueue();
    //默认 true
    boolean keepDelayed = 
        getExecuteExistingDelayedTasksAfterShutdownPolicy();//线程池关闭后是否应该继续执行延迟任务标志
        //默认false
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();//线程池关闭后是否应该继续执行周期性任务标志
    //判断是否在线程池shutdown后继续执行延迟任务,是否继续执行周期性调度任务
    if (!keepDelayed && !keepPeriodic) {//都不是,将任务队列清空,同时取消任务执行
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture)
                ((RunnableScheduledFuture) e).cancel(false);
        q.clear();
    }
    else {
        //否则遍历任务队列,分别处理周期任务和延迟任务
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture t =
                    (RunnableScheduledFuture)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // 这里的 t.isCancelled() 表示任务已经被取消,应移除
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();//调用该方法尝试进一步转换线程池状态
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5671008.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存