以下是在学习中整理的一些内容,如有错误点,多谢指出。
ScheduledExecutorService可以用来在给定延时后执行异步任务或者周期性执行任务,由于放入的任务不一定能够立即执行,所以还是需要得放入队列,然后获取,看看是否满足执行条件:时间是否满足。
ScheduledExecutorService接口public interface ScheduledExecutorService extends ExecutorService { //在延迟delay时间后执行command。unit为时间单位。只调度一次 public ScheduledFuture> schedule(Runnable command,long delay, TimeUnit unit); //执行callable。 publicScheduledThreadPoolExecutor核心变量ScheduledFuture schedule(Callable callable,long delay, TimeUnit unit); //基于 上一次开始时间 来延迟固定时间后执行下一次任务 public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit); //基于 上一次结束时间 public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { //在关闭时应该取消周期性任务 private volatile boolean continueExistingPeriodicTasksAfterShutdown; //如果在关闭时应该取消非周期性的任务 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; //是否应该从队列中删除 private volatile boolean removeonCancel = false; //顺序号, 保证FIFO private static final AtomicLong sequencer = new AtomicLong(); // ScheduledFutureTask类 用于封装Runnable Callable 对象 private class ScheduledFutureTaskscheduleAtFixedRate实现extends FutureTask implements RunnableScheduledFuture { //序号 private final long sequenceNumber; //以纳秒为单位,表明该任务下一次能够被调度的时间 private long time; //重复任务的周期 private final long period; //被 reExecutePeriodic 方法重新加入队列中的实际任务,默认当前任务 RunnableScheduledFuture outerTask = this; //延迟队列的索引 int heapIndex; //用于取消任务执行 public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); //如果取消成功,则从任务队列里移除任务 if (cancelled && removeonCancel && heapIndex >= 0) remove(this); return cancelled; } public void run() { boolean periodic = isPeriodic();//是不是周期性执行任务 //判断是否能继续执行 if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic)//不是周期性调度任务,直接调用run ScheduledFutureTask.super.run(); //FutureTask的run方法 else if (ScheduledFutureTask.super.runAndReset()) {//是周期性调度任务runAndReset方法执行 setNextRunTime(); //设置下一次调度时间 reExecutePeriodic(outerTask);//通过这个进行调度 } } private void setNextRunTime() { long p = period; if (p > 0)//p>0 任务开始执行的时间 +周期调度时间 time += p; else //如果period 小于0 ,以任务执行完毕后的的时间来计算下一次执行的时间 time = triggerTime(-p); } //将任务重新放入任务队列中执行 void reExecutePeriodic(RunnableScheduledFuture> task) { //根据当前线程池状态,判断当前任务是否允许被执行 if (canRunInCurrentRunState(true)) { super.getQueue().add(task); //将任务添加到延迟队列中 if (!canRunInCurrentRunState(true) && remove(task))//再次判断是否应该执行 task.cancel(false); //取消任务执行 else //正常情况下,保证线程池中至少有一个工作线程在处理任务 ensurePrestart(); } } } //延时队列的实现原理:DelayedWorkQueue :因为都是周期性任务 带有时间的 对其排序 使用的是小根堆 static class DelayedWorkQueue extends AbstractQueue implements BlockingQueue { //初始容量为 16 private static final int INITIAL_CAPACITY = 16; //任务队列 数组 private RunnableScheduledFuture>[] queue = new RunnableScheduledFuture>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; //线程Leader private Thread leader = null; //条件变量用于工作线程等待执行任务 private final Condition available = lock.newCondition(); } }
表示周期性任务调度,每次任务基于上一次任务开始执行的时间来决定下次启动时间
public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); //triggerTime 用于计算该任务应被调度的时间,unit.tonanos(period) 用于执行周期变为纳秒 ScheduledFutureTaskdelayedExecute方法sft = new ScheduledFutureTask (command,null,triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } //封装成了 ScheduledFutureTask 对象,通过 decorateTask, 调用 delayedExecute 方法执行 protected RunnableScheduledFuture decorateTask( Callable callable, RunnableScheduledFuture task) { return task; }
//主执行周期性调度或者延迟任务的方法。 private void delayedExecute(RunnableScheduledFuture> task) { if (isShutdown())//SHURDOWN 了? reject(task); else { super.getQueue().add(task); // 添加到队列里去 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) &&//当前线程是不是在shutdown 的状态下执行 remove(task))//为false的情况下 从当前队列移除 然后取消当前任务 task.cancel(false); else //确保至少还有一个线程在执行任务 ensurePrestart(); } }ensurePrestart方法
如果线程池未关闭那么 ensurePrestart方法
//保证的是至少启动一个核心线程 void ensurePrestart() { int wc = workerCountOf(ctl.get());//获取工作线程数 if (wc < corePoolSize)//如果小于 核心线程数 addWorker(null, true);//添加 核心线程数 else if (wc == 0)//至少保证还有一个工作线程 addWorker(null, false); }add添加
如果线程未关闭,那么直接调用 DelayedWorkQueue 里的 add -> offer 方法,将任务task 放入队列中
public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable x) {//向数组中添加任务 if (x == null) throw new NullPointerException(); RunnableScheduledFuture> e = (RunnableScheduledFuture>)x; final ReentrantLock lock = this.lock; lock.lock(); try {//要不要扩容 int i = size; if (i >= queue.length) grow(); size = i + 1; //之前没有任务,放在第一位就行 if (i == 0) { queue[0] = e; setIndex(e, 0); } else { //否则将任务放到最后一位,然后通过siftUp 方法调整 siftUp(i, e); } //如果队列中的第一个任务是当前e则清除leader线程,然后唤醒一个等待队列可用的线程来执行任务 if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }grow 扩容方法
private void grow() { int oldCapacity = queue.length; //每次扩容 50% int newCapacity = oldCapacity + (oldCapacity >> 1); if (newCapacity < 0) //如果新容量小于0,那么表明溢出了 newCapacity = Integer.MAX_VALUE; //将 old数组任务 复制到相信数组上 queue = Arrays.copyOf(queue, newCapacity); }siftUp调整方法
//调整堆 private void siftUp(int k, RunnableScheduledFuture> key) { //当K>0时,不断进行调整。k等于0表明调整到了根节点,也就是第一个元素,这时必须退出循环 while (k > 0) { // 找到他的 父亲节点 int parent = (k - 1) >>> 1; RunnableScheduledFuture> e = queue[parent]; //与父亲节点比较 看看需不需要动,直到找到 位置 if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }canRunInCurrentRunState
如果在添加到优先级队列后,线程池已经关闭,需要通过 来判断是否应该继续执行该任务
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }take 方法
//从队列中获取任务,如果当前队列没有任务可取,则阻塞直到队列有任务,即等待offer方法唤醒 public RunnableScheduledFuture> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//可响应中断的方式加锁 try { for (;;) { RunnableScheduledFuture> first = queue[0]; //如果队列第一个任务为null,则证明没有任务了,当前线程等待 if (first == null) available.await(); else {//否则获取第一个任务的剩余等待时间,判断是否小于0.需不需要执行 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // 在线程等待任务可执行时不保留引用 //由于任务还需要等待一段时间才能执行,这时看看前面有没有线程正在等待,如果有,则当前线程继续等待 if (leader != null)//根本没有必要让拿到第一个任务的线程等待 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { //将 leader 变量去掉 if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
//完成最终的任务出队,这里传入的f 为第一个等待任务。由于任务被出队,因此需要调整堆 private RunnableScheduledFuture> finishPoll(RunnableScheduledFuture> f) { //当前任务队列内任务数量 --,然后取出队列尾部的一个任务,调用siftDown重新调整堆的顺序 int s = --size; RunnableScheduledFuture> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; }siftDown : 调整堆
//调整堆。将任务向堆尾移动。 private void siftDown(int k, RunnableScheduledFuture> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; //获取左孩子节点索引 RunnableScheduledFuture> c = queue[child]; int right = child + 1; //右孩子 if (right < size && c.compareTo(queue[right]) > 0)//比较左右孩子的大小 c = queue[child = right]; if (key.compareTo(c) <= 0)// 以左右孩子的min 来和 key比较 break; queue[k] = c; //如果key 小于 他们的min 那么交换 setIndex(c, k); k = child; } queue[k] = key; //此时k 即为传入key 应该存放的下标 setIndex(key, k); }scheduleWithFixedDelay实现
scheduleAtFixedRate 和 scheduleWithFixedDelay,前者是基于任务开始时间计算的 ,后者 是 基于上一个任务执行完成的时间计算的
public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTaskshutdownsft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
ThreadPoolExecutor 的shutdown ->onShutdown
@Override void onShutdown() { //首先获取任务队列 q BlockingQueueq = super.getQueue(); //默认 true boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();//线程池关闭后是否应该继续执行延迟任务标志 //默认false boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();//线程池关闭后是否应该继续执行周期性任务标志 //判断是否在线程池shutdown后继续执行延迟任务,是否继续执行周期性调度任务 if (!keepDelayed && !keepPeriodic) {//都不是,将任务队列清空,同时取消任务执行 for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture>) ((RunnableScheduledFuture>) e).cancel(false); q.clear(); } else { //否则遍历任务队列,分别处理周期任务和延迟任务 for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture> t = (RunnableScheduledFuture>)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // 这里的 t.isCancelled() 表示任务已经被取消,应移除 if (q.remove(t)) t.cancel(false); } } } } tryTerminate();//调用该方法尝试进一步转换线程池状态 }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)