Kafka中存在大量的延迟 *** 作,比如延迟生产、延迟拉取和延迟删除等,Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮算法自定义实现了一个用于延迟功能的定时器(SystemTimer)。
JDK中的Timer和DelayQueue单个任务的插入和删除的平均时间复杂度为O(logN),并不能满足Kafka的高性能要求,而基于时间轮可以将任务的插入和删除 *** 作的时间复杂度降为O(1)。
下图即为Kafka的时间轮结构:
Kafka的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务链表(TimerTaskList),或者称之为任务槽。TimerTaskList是一个环形的双向链表,链表中的每一项表示的均是定时任务(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。
时间轮由多个时间格组成, 每个时间格代表当前时间轮的基本时间跨度(tickMs) 。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式tickMs × wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。
下面我们通过Kafka的源代码来具体讲解一下时间轮算法。
任务添加def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { // Cancelled false } else if (expiration < currentTime + tickMs) { // Already expired false } else if (expiration < currentTime + interval) { // Put in its own bucket val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) // Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } }
以222任务为例,讲解一下任务添加到时间轮的过程:
SystemTimer的addTimerTaskEntry方法调用的是TimeingWheel的add方法,若任务添加失败,则证明当前任务已到期,直接将该任务交给工作线程来执行;
TimeingWheel的add方法首先获取任务的过期时间expiration,这里为222;下面走到判断逻辑:
若expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行;
假设SystemTimer的创建时间为0,则SystemTimer创建的TimeingWheel的currentTime也为0,由于222 > 0+1,所以不符合第1个判断,进入第2个判断。
若expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽;
由于222 > 0+10,所以不符合第2个判断,进入第3个判断。
若expiration >= currentTime + interval,证明该层次的时间轮不可以容纳该任务,需要往上尝试上一层时间轮;
获取到上一层时间轮后,直接在上一层时间轮上继续执行add方法。
第2层时间轮的tick=10,interval=100,由于222 > 0+100,所以还是进入到第3个判断,继续获取上一层时间轮。
第3层时间轮的tick=100,interval=1000,由于222 < 0+1000,所以进入到第2个判断,执行任务的添加过程。
下面接着看任务的添加过程:
(1) 首先是计算槽位;
val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) virtualId = 222 / 100 = 2 bucket = 2 % 10 = 2 即第2个槽位,对应[200-300]的范围
(2) 获取到该槽位上的任务链表,并将任务添加到链表里;
bucket.add(timerTaskEntry)
(3) 若该链表是首次添加任务,则需要设置链表的过期时间expiration,并将该链表添加到SystemTimer的DelayQueue中。
// Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { queue.offer(bucket) } 过期时间为2*100=200
可以看一下timerTaskList.setExpiration方法:
def setExpiration(expirationMs: Long): Boolean = { expiration.getAndSet(expirationMs) != expirationMs }
可以发现,链表的过期时间若与之前设置的相同,则直接返回False,避免重复将链表添加到Timer的DelayQueue中。
时间轮推动接下来,我们看一下如何推动时间轮,假设我们创建了1个SystemTimer,并添加了过期时间为9、88、222、520、521、522等6个定时任务,分别编号①到⑥。
任务添加后的时间轮示意图如下:
SystemTimer构造器如下:
@threadsafe class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer { // timeout timer private[this] val taskExecutor = Executors.newFixedThreadPool(1, (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable)) private[this] val delayQueue = new DelayQueue[TimerTaskList]() private[this] val taskCounter = new AtomicInteger(0) private[this] val timingWheel = new TimingWheel( tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue )
SystemTimer是依靠DelayQueue来进行时间轮推进的。
def advanceClock(timeoutMs: Long): Boolean = { // 获取DelayQueue首元素(最快过期的任务槽) var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { writeLock.lock() try { // 若任务槽不为null,则不断循环(while保证了时间轮的不断推进) while (bucket != null) { // 级联更新各层级的时间轮currentTime为时间槽的过期时间 timingWheel.advanceClock(bucket.getExpiration) // 删除该槽,并将时间槽中的任务重新添加到时间轮 bucket.flush(addTimerTaskEntry) // 继续获取DelayQueue首元素(最快过期的任务槽) bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false } }
任务1所在槽为DelayQueue首元素,其过期时间为9,然后,各级时间轮的currentTime更新为9;对任务1所在槽中的各元素执行flush *** 作;
// Remove all task entries and apply the supplied function to each of them def flush(f: TimerTaskEntry => Unit): Unit = { synchronized { var head = root.next while (head ne root) { // 删除槽中的各元素 remove(head) // 执行传入的function f(head) head = root.next } // 将原有时间槽的过期时间设置为-1 expiration.set(-1L) } }
flush传入的函数为SystemTimer的addTimerTaskEntry:
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { // 尝试往时间轮中添加任务TimerTaskEntry if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled // 若添加失败,则证明该任务被取消或者已经过期 if (!timerTaskEntry.cancelled) // 过期任务,直接提交给工作线程执行 taskExecutor.submit(timerTaskEntry.timerTask) } }
任务1重新添加到时间轮,此时:
currentTime=9 expiration=9 tick=1 interval=10 expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行
继续执行delayQueue.poll()方法,此时返回任务②所在的槽,其过期时间为80,然后,各级时间轮的currentTime更新为80;
任务②重新添加到时间轮,此时:
currentTime=80 expiration=88 tick=1 interval=10 expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽; virtualId = expiration / tickMs = 88 / 1 = 88 bucket = virtualId % wheelSize = 88 % 10 = 8 即第8个槽位,将槽位的过期时间设置为88,并添加到延迟队列delayQueue中
继续执行delayQueue.poll()方法,此时返回任务②所在的槽,其过期时间为88,然后,各级时间轮的currentTime更新为88;
任务②重新添加到时间轮,此时:
currentTime=88 expiration=88 tick=1 interval=10 expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行
其他任务以此类推。
重点理解2点:
(1) currentTime是如何演进的;
(2) 任务是如何从时间大轮向小轮降级的。
SystemTimer是依靠DelayQueue来进行时间轮推进的,而DelayQueue中的元素则为时间轮中的槽TimerTaskList。
添加到延迟队列的元素必须实现Delayed接口的getDelay和compareTo方法:
def getDelay(unit: TimeUnit): Long = { unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS) } def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[TimerTaskList] java.lang.Long.compare(getExpiration, other.getExpiration) }
当任务槽的delay<=0时,该任务槽会被从延迟队列中poll出来,然后遍历槽中的元素,依次执行重新添加到时间轮的操作;
将时间槽中的任务重新添加到时间轮时,会发生任务槽降级或者任务直接提交给工作线程执行。
每次重新添加槽,均是从最小的时间轮开始尝试的:
比如任务③,其初始槽位在第3层时间轮的第2个槽位,当其被取出重新添加到时间轮时,首先从第1层时间轮尝试:
currentTime=200 expiration=222 tick=1 interval=10 expiration >= currentTime + interval,证明该层次的时间轮不可以容纳该任务,需要往上尝试上一层时间轮
接着尝试第2层时间轮:
currentTime=200 expiration=222 tick=10 interval=100 expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽; virtualId = 222 / 10 = 22 bucket = 22 % 10 = 2 即第2个槽位,并设置该槽位的过期时间为virtualId * tickMs = 22*10 = 220
可以发现,任务③从第3层时间轮的第2个时间槽(过期时间为200)降级到第2层时间轮的第2个时间槽(过期时间为220)。
依次推导,下次降级会从第2层时间轮的第2个时间槽(过期时间为220)降级到第1层时间轮的第2个时间槽(过期时间为222)。
接着再次降级,此时,已没有更低精度的时间轮了,expiration < currentTime + tick,表明当前任务已到期,将该任务交给工作线程来执行。
综上,随着时间轮的不断推进,任务会被反复重新添加到时间轮,其槽位会不断降级,且过期时间的精度也会逐步提高,直至精度到达最小时间轮的精度,表明任务真正到期,提交执行。
分析到这里可以发现,Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的 *** 作,而DelayQueue专门负责时间推进的任务。试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue 的队头只需要O(1)的时间复杂度(获取之后DelayQueue内部才会再次切换出新的队头)。如果采用每毫秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓“知人善用”,用TimjngWheel做最擅长的任务添加和删除 *** 作,而用DelayQueue做最擅长的时间推进工作,两者相辅相成。
参考文献:
[1] https://github.com/apache/kafka/tree/trunk/core/src/main/scala/kafka/utils/timer
[2] 《深入理解Kafka:核心设计与实践原理》,作者:朱忠华,出版社:电子工业出版社
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)