系列文章Github地址:https://github.com/lhj502819/netty/tree/v502819-main,示例代码在example模块中
你知道都有哪些I/O模型吗?Java NIO三大角色Channel、Buffer、SelectorDoug lea《Scalable IO in Java》翻译Reactor模型你知道都有哪些吗?Netty服务端创建源码流程解析EventLoopGroup到底是个啥?深入剖析Netty之EventLoop刨根问底深入剖析Netty之NioEventLoop寻根究底未完待续…
在上篇文章中我们讲解NioEventLoop时,我们提到了Netty中的任务分为普通任务和定时任务,我们可以通过EventLoop创建一个定时任务,使用方法我们就不在这里讲解了,没使用过的小伙伴可以去度娘找一个Demo看一看,今天这篇文章我们主要讲解Netty的定时任务实现机制。
我们知道在JDK1.5之后提供了定时任务的接口抽象ScheduledExecutorService以及实现ScheduledThreadPoolExecutor,Netty并没有直接使用JDK提供的定时任务实现,而是基于ScheduledExecutorService接口进行了自定义实现。首先我们先来看下ScheduledExecutorService都定义了哪些基础方法。
public ScheduledFuture> schedule(Runnable command,long delay, TimeUnit unit); publicScheduledFuture schedule(Callable callable,long delay, TimeUnit unit); public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
我们就不对ScheduledThreadPoolExecutor进行讲解了,不能跑题,还是来讲解Netty是如何做的。
ScheduledFutureTask:实现定时任务的核心类AbstractScheduledEventExecutor:集成ScheduledFutureTask对ScheduledExecutorService进行实现,进行行为控制
Netty实现定时任务的核心类就是这两个,我们先来分析下ScheduledFutureTask,
ScheduledFutureTask的主要职责就是判断是否应该要执行定时任务以及执行定时任务。
ScheduledFutureTask(AbstractScheduledEventExecutor executor, Runnable runnable, long nanoTime) { super(executor, runnable); deadlineNanos = nanoTime; //默认两个任务之间没有延迟 periodNanos = 0; } ScheduledFutureTask(AbstractScheduledEventExecutor executor, Runnable runnable, long nanoTime, long period) { super(executor, runnable); deadlineNanos = nanoTime; periodNanos = validatePeriod(period); }成员变量
private static final long START_TIME = System.nanoTime(); static long nanoTime() { return System.nanoTime() - START_TIME; } static long deadlineNanos(long delay) { long deadlineNanos = nanoTime() + delay; // Guard against overflow return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; } static long initialNanoTime() { return START_TIME; } // set once when added to priority queue private long id; private long deadlineNanos; private final long periodNanos;主要方法 取消任务
public boolean cancel(boolean mayInterruptIfRunning) { boolean canceled = super.cancel(mayInterruptIfRunning); if (canceled) { scheduledExecutor().removeScheduled(this); } return canceled; }
具体的细节这里就不展开讲解了,主要逻辑就是进行事件通知并将任务从任务队列中移除。
核心逻辑在#run方法中,主要逻辑就是判断自身是否达到了执行的时间,并执行,需具体代码如下:
public void run() { assert executor().inEventLoop(); try { if (delayNanos() > 0L) { // 大于零表示没过期 // Not yet expired, need to add or remove from queue if (isCancelled()) { //如果已经取消,则从定时任务队列中移除当前任务 scheduledExecutor().scheduledTaskQueue().removeTyped(this); } else { //则将当前任务添加到定时任务队列中 //后续会有EventLoop轮询队列中的定时任务是否该执行,我们在前边的文章中讲过 scheduledExecutor().scheduleFromEventLoop(this); } return; } 任务过期// //如果执行两个任务的执行延迟为0 if (periodNanos == 0) { if (setUncancellableInternal()) { V result = runTask(); setSuccessInternal(result); } } else { //两个任务的执行延迟大于0 ,判断任务是否已经取消 // check if is done as it may was cancelled if (!isCancelled()) { //如果任务没有被取消,则执行任务 runTask(); if (!executor().isShutdown()) { if (periodNanos > 0) { //重新设置下一次的执行时间,任务开始执行时间 + 两个任务的延迟时间 deadlineNanos += periodNanos; } else { //如果两次执行间隔小于0,负负得正获得新的执行时间 //设置执行时间为当前时间 deadlineNanos = nanoTime() - periodNanos; } if (!isCancelled()) { //如果没有被取消,将当前任务添加到定时任务队列中 scheduledExecutor().scheduledTaskQueue().add(this); } } } } } catch (Throwable cause) { setFailureInternal(cause); } }AbstractScheduledEventExecutor
该类的主要责任是对定时任务的执行进行行为封装,比如定时任务的定义,定时任务的调度,定时任务队列的存储。大致都比较简单,接下来我们对其API进行简单讲解。
PriorityQueue> scheduledTaskQueue;
该变量是一个存储定时任务的队列,通过名称可以看出来是有序的,通过IDEA的快捷键查看该变量引用的位置,可以看到有取消和取两个 *** 作。
既然有取的位置,那肯定得有加的位置,那任务是从哪里添加到队列里的呢?
AbstractScheduledEventExecutor对Java的ScheduledExecutorService进行了实现
@Override public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { delay = 0; } validateScheduled0(delay, unit); return schedule(new ScheduledFutureTask( this, command, deadlineNanos(unit.toNanos(delay)))); } @Override public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(callable, "callable"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { delay = 0; } validateScheduled0(delay, unit); return schedule(new ScheduledFutureTask (this, callable, deadlineNanos(unit.toNanos(delay)))); } @Override public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (period <= 0) { throw new IllegalArgumentException( String.format("period: %d (expected: > 0)", period)); } validateScheduled0(initialDelay, unit); validateScheduled0(period, unit); return schedule(new ScheduledFutureTask ( this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); } @Override public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (delay <= 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: > 0)", delay)); } validateScheduled0(initialDelay, unit); validateScheduled0(delay, unit); return schedule(new ScheduledFutureTask ( this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); }
其实四个方法只是一个门面,对细节进行了封装,最终都是创建了一个ScheduledFutureTask,只是构造的参数不同。
privateScheduledFuture schedule(final ScheduledFutureTask task) { if (inEventLoop()) { //如果在EventLoop线程中则直接将任务添加到定时任务队列中 scheduleFromEventLoop(task); } else { final long deadlineNanos = task.deadlineNanos(); // task will add itself to scheduled task queue when run if not expired if (beforeScheduledTaskSubmitted(deadlineNanos)) { //添加一个异步任务 execute(task); } else { //懒添加一个异步任务 lazyExecute(task); // Second hook after scheduling to facilitate race-avoidance if (afterScheduledTaskSubmitted(deadlineNanos)) { //任务已经提交完成后 execute(WAKEUP_TASK); } } } return task; }
其实核心逻辑就是构造一个定时任务添加到任务队列中等待执行。
今天我们讲解了Netty的定时任务机制,先介绍了Java的定时任务相关内容,后又讲解了Netty的定时任务类ScheduledFutureTask和定时任务执行器类AbstractScheduledEventExecutor,都比较简单,我们就不过多阐述了,滋滋。。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)