Kafka的TimingWheel(时间轮)算法

Kafka的TimingWheel(时间轮)算法,第1张

Kafka的TimingWheel(时间轮)算法

Kafka中存在大量的延迟 *** 作,比如延迟生产、延迟拉取和延迟删除等,Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮算法自定义实现了一个用于延迟功能的定时器(SystemTimer)。

JDK中的Timer和DelayQueue单个任务的插入和删除的平均时间复杂度为O(logN),并不能满足Kafka的高性能要求,而基于时间轮可以将任务的插入和删除 *** 作的时间复杂度降为O(1)。

下图即为Kafka的时间轮结构:

Kafka的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务链表(TimerTaskList),或者称之为任务槽。TimerTaskList是一个环形的双向链表,链表中的每一项表示的均是定时任务(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成, 每个时间格代表当前时间轮的基本时间跨度(tickMs) 。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式tickMs × wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。

下面我们通过Kafka的源代码来具体讲解一下时间轮算法。

任务添加
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
	val expiration = timerTaskEntry.expirationMs

	if (timerTaskEntry.cancelled) {
	  // Cancelled
	  false
	} else if (expiration < currentTime + tickMs) {
	  // Already expired
	  false
	} else if (expiration < currentTime + interval) {
	  // Put in its own bucket
	  val virtualId = expiration / tickMs
	  val bucket = buckets((virtualId % wheelSize.toLong).toInt)
	  bucket.add(timerTaskEntry)

	  // Set the bucket expiration time
	  if (bucket.setExpiration(virtualId * tickMs)) {
		// The bucket needs to be enqueued because it was an expired bucket
		// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
		// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
		// will pass in the same value and hence return false, thus the bucket with the same expiration will not
		// be enqueued multiple times.
		queue.offer(bucket)
	  }
	  true
	} else {
	  // Out of the interval. Put it into the parent timer
	  if (overflowWheel == null) addOverflowWheel()
	  overflowWheel.add(timerTaskEntry)
	}
}

以222任务为例,讲解一下任务添加到时间轮的过程:

SystemTimer的addTimerTaskEntry方法调用的是TimeingWheel的add方法,若任务添加失败,则证明当前任务已到期,直接将该任务交给工作线程来执行;

TimeingWheel的add方法首先获取任务的过期时间expiration,这里为222;下面走到判断逻辑:

若expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行;

假设SystemTimer的创建时间为0,则SystemTimer创建的TimeingWheel的currentTime也为0,由于222 > 0+1,所以不符合第1个判断,进入第2个判断。

若expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽;

由于222 > 0+10,所以不符合第2个判断,进入第3个判断。

若expiration >= currentTime + interval,证明该层次的时间轮不可以容纳该任务,需要往上尝试上一层时间轮;

获取到上一层时间轮后,直接在上一层时间轮上继续执行add方法。

第2层时间轮的tick=10,interval=100,由于222 > 0+100,所以还是进入到第3个判断,继续获取上一层时间轮。

第3层时间轮的tick=100,interval=1000,由于222 < 0+1000,所以进入到第2个判断,执行任务的添加过程。

下面接着看任务的添加过程:

(1) 首先是计算槽位;

val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)

virtualId = 222 / 100 = 2
bucket = 2 % 10 = 2

即第2个槽位,对应[200-300]的范围

(2) 获取到该槽位上的任务链表,并将任务添加到链表里;

bucket.add(timerTaskEntry)

(3) 若该链表是首次添加任务,则需要设置链表的过期时间expiration,并将该链表添加到SystemTimer的DelayQueue中。

// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) {
	queue.offer(bucket)
}

过期时间为2*100=200

可以看一下timerTaskList.setExpiration方法:

def setExpiration(expirationMs: Long): Boolean = {
	expiration.getAndSet(expirationMs) != expirationMs
}

可以发现,链表的过期时间若与之前设置的相同,则直接返回False,避免重复将链表添加到Timer的DelayQueue中。

时间轮推动

接下来,我们看一下如何推动时间轮,假设我们创建了1个SystemTimer,并添加了过期时间为9、88、222、520、521、522等6个定时任务,分别编号①到⑥。

任务添加后的时间轮示意图如下:

SystemTimer构造器如下:

@threadsafe
class SystemTimer(executorName: String,
                  tickMs: Long = 1,
                  wheelSize: Int = 20,
                  startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {

  // timeout timer
  private[this] val taskExecutor = Executors.newFixedThreadPool(1,
    (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))

  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  private[this] val taskCounter = new AtomicInteger(0)
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs,
    wheelSize = wheelSize,
    startMs = startMs,
    taskCounter = taskCounter,
    delayQueue
  )

SystemTimer是依靠DelayQueue来进行时间轮推进的。

def advanceClock(timeoutMs: Long): Boolean = {
	// 获取DelayQueue首元素(最快过期的任务槽)
	var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
	if (bucket != null) {
	  writeLock.lock()
	  try {
		// 若任务槽不为null,则不断循环(while保证了时间轮的不断推进)
		while (bucket != null) {
		  // 级联更新各层级的时间轮currentTime为时间槽的过期时间
		  timingWheel.advanceClock(bucket.getExpiration)
		  // 删除该槽,并将时间槽中的任务重新添加到时间轮
		  bucket.flush(addTimerTaskEntry)
		  // 继续获取DelayQueue首元素(最快过期的任务槽)
		  bucket = delayQueue.poll()
		}
	  } finally {
		writeLock.unlock()
	  }
	  true
	} else {
	  false
	}
}

任务1所在槽为DelayQueue首元素,其过期时间为9,然后,各级时间轮的currentTime更新为9;对任务1所在槽中的各元素执行flush *** 作;

// Remove all task entries and apply the supplied function to each of them
def flush(f: TimerTaskEntry => Unit): Unit = {
	synchronized {
	  var head = root.next
	  while (head ne root) {
		// 删除槽中的各元素
		remove(head)
		// 执行传入的function
		f(head)
		head = root.next
	  }
	  // 将原有时间槽的过期时间设置为-1
	  expiration.set(-1L)
	}
}

flush传入的函数为SystemTimer的addTimerTaskEntry:

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
	// 尝试往时间轮中添加任务TimerTaskEntry
	if (!timingWheel.add(timerTaskEntry)) {
	  // Already expired or cancelled
	  // 若添加失败,则证明该任务被取消或者已经过期
	  if (!timerTaskEntry.cancelled)
	    // 过期任务,直接提交给工作线程执行
		taskExecutor.submit(timerTaskEntry.timerTask)
	}
}

任务1重新添加到时间轮,此时:

currentTime=9
expiration=9
tick=1
interval=10

expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行

继续执行delayQueue.poll()方法,此时返回任务②所在的槽,其过期时间为80,然后,各级时间轮的currentTime更新为80;

任务②重新添加到时间轮,此时:

currentTime=80
expiration=88
tick=1
interval=10

expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽;

virtualId = expiration / tickMs = 88 / 1 = 88
bucket = virtualId % wheelSize = 88 % 10 = 8
即第8个槽位,将槽位的过期时间设置为88,并添加到延迟队列delayQueue中

继续执行delayQueue.poll()方法,此时返回任务②所在的槽,其过期时间为88,然后,各级时间轮的currentTime更新为88;

任务②重新添加到时间轮,此时:

currentTime=88
expiration=88
tick=1
interval=10

expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行

其他任务以此类推。

重点理解2点:

(1) currentTime是如何演进的;
(2) 任务是如何从时间大轮向小轮降级的。

任务槽

SystemTimer是依靠DelayQueue来进行时间轮推进的,而DelayQueue中的元素则为时间轮中的槽TimerTaskList。

添加到延迟队列的元素必须实现Delayed接口的getDelay和compareTo方法:

def getDelay(unit: TimeUnit): Long = {
	unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)
}

def compareTo(d: Delayed): Int = {
	val other = d.asInstanceOf[TimerTaskList]
	java.lang.Long.compare(getExpiration, other.getExpiration)
}

当任务槽的delay<=0时,该任务槽会被从延迟队列中poll出来,然后遍历槽中的元素,依次执行重新添加到时间轮的操作;

将时间槽中的任务重新添加到时间轮时,会发生任务槽降级或者任务直接提交给工作线程执行。

每次重新添加槽,均是从最小的时间轮开始尝试的:

比如任务③,其初始槽位在第3层时间轮的第2个槽位,当其被取出重新添加到时间轮时,首先从第1层时间轮尝试:

currentTime=200
expiration=222
tick=1
interval=10

expiration >= currentTime + interval,证明该层次的时间轮不可以容纳该任务,需要往上尝试上一层时间轮

接着尝试第2层时间轮:

currentTime=200
expiration=222
tick=10
interval=100

expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽;

virtualId = 222 / 10 = 22
bucket = 22 % 10 = 2

即第2个槽位,并设置该槽位的过期时间为virtualId * tickMs = 22*10 = 220

可以发现,任务③从第3层时间轮的第2个时间槽(过期时间为200)降级到第2层时间轮的第2个时间槽(过期时间为220)。

依次推导,下次降级会从第2层时间轮的第2个时间槽(过期时间为220)降级到第1层时间轮的第2个时间槽(过期时间为222)。

接着再次降级,此时,已没有更低精度的时间轮了,expiration < currentTime + tick,表明当前任务已到期,将该任务交给工作线程来执行。

综上,随着时间轮的不断推进,任务会被反复重新添加到时间轮,其槽位会不断降级,且过期时间的精度也会逐步提高,直至精度到达最小时间轮的精度,表明任务真正到期,提交执行。

分析到这里可以发现,Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的 *** 作,而DelayQueue专门负责时间推进的任务。试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue 的队头只需要O(1)的时间复杂度(获取之后DelayQueue内部才会再次切换出新的队头)。如果采用每毫秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓“知人善用”,用TimjngWheel做最擅长的任务添加和删除 *** 作,而用DelayQueue做最擅长的时间推进工作,两者相辅相成。

参考文献:

[1] https://github.com/apache/kafka/tree/trunk/core/src/main/scala/kafka/utils/timer
[2] 《深入理解Kafka:核心设计与实践原理》,作者:朱忠华,出版社:电子工业出版社

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709229.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存