Flink源码漫游指南<肆>Window和它的铁三角

Flink源码漫游指南<肆>Window和它的铁三角,第1张

Flink源码漫游指南<肆>Window和它的铁三角

使用过WindowedStream的同学一定对WindowAssigner、trigger和evictor不陌生,可以说这三者共同定义了一个window流的核心,为什么这么说呢,我们打开WindowedStream类瞅一瞅

 可以看到,WindowedStream类一共就只有六个属性,最核心的就是红线划出来的铁三角

其中:

  • WindowAssigner定义了进入窗口流的element进入哪些窗口,创建窗口流时必须由用户传入
  • 当Trigger被流中的事件触发时,Trigger会决定当前窗口是否进行计算
  • Evictor是流中的过滤器,可以选择是再窗口聚合之前过滤还是在窗口聚合之后过滤

三者在flink程序中的常见使用方法如下:

keyedStream.window(WindowAssigner).trigger(Trigger).evictor(Evictor).aggregate();

本文将按WindowAssigner、trigger、Evictor、聚合函数 四个小章节依次展开

WindowAssigner

我们先来看看WA有哪些方法和属性吧

可以看到,最重要的是assignWindows方法,它为每一个传入的element返回它应该进入的window集合。我们再来看看这个方法在 SlidingEventTimeWindows 这个WindowAssigner子类中的实现(该类对应了滑动事件时间窗口)。老规矩,主要看⭐标注的地方

@Override
	public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		//⭐判断timestamp是否有效
		if (timestamp > Long.MIN_VALUE) {
			//⭐根据窗口长度和滑动时间计算数据元素所属窗口的数量
			List windows = new ArrayList<>((int) (size / slide));
			//⭐找到窗口列表中最晚的时间窗口的开始时间
			long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
			//⭐循环创建当前数据元素所属的窗口列表
			for (long start = lastStart;
				start > timestamp - size;
				start -= slide) {
				windows.add(new TimeWindow(start, start + size));
			}
			return windows;
		} else {
			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
					"'DataStream.assignTimestampsAndWatermarks(...)'?");
		}
	}

需要注意的是,这里的实现是 直接返回了窗口集合 而不是找到已有的窗口然后将element放进去,很容易想到前者是一种更加高效的方法,因为Collection有去重功能,可以根据返回的集合中的TimeWindow的hash值来找到内存中的存储地址,然后把element放进去。如果你背过一点equals和hashcode的八股文,这里应该很容易猜到TimeWindow类重写了hashcode()方法和equals()方法,我们打开TimeWindow类看看到底是不是和我们猜的一样。

果然,和我们猜的一样,这两个方法被重写了,逻辑很简单,就是把window的开始时间和结束时间数学相加作为hash值,开始时间和结束时间同时相等时才判断相等。

上面我们看的是time window,那如果是count window呢,count window的开始和结束时间是不确定的,元素到来的时候是无法把窗口创建出来的,flink是如何实现的呢?

不急,这涉及了铁三角的综合使用,我们先把铁三角都先稍微了解一下再来研究研究count window是如何实现的。

 

Trigger

我们先来看看trigger的基类有哪些方法

 几个重要方法的作用我都写出来了,需要注意的是,这三个箭头标出来的方法都是返回的TriggerResult,这个类是个枚举类,只有下面四种情况

我们再来看看基于事件时间的trigger子类 EventTimeTrigger 是怎么实现这几个定时器方法的吧,请看注释

 

 

Evictor

我们照例先看看evictor基类

evictor中只定义了evictorBefore和evcitorAfter两种方法,分别在窗口聚合之前和之后调用,作用都是过滤element,非常易懂

还记得吗,在介绍完WindowAssigner的时候,我留了一个问题,flink是如何基于铁三角实现基于count的window呢?

 flink的实现非常巧妙,我们先来看看KeyedStream是如何转化成countwindow的

 我们可以看到,WindowAssigner参数使用的是GlobalWindowstrigger传入的是CountTriggerevictor传入的是CountEvictor,也就是flink自己基于WindowAssigner、Trigger、Evictor三个基类自定义的类,正好就是使用了一套铁三角。

我们来挨个看看这三个自定义类吧

先看看GlobalWindows,首先解释一下,flink中有GlobalWindow和GlobalWindows两个类,不要看花眼,其中一个最后带了s

  • GlobalWindow:Window的子类,是一种存储所有数据的Window
  • GlobalWindows:WindowAssigner的子类,GlobalWindow对应的assigner,把所有数据都放进同一个GlobalWindow

可以看到,GWs的assignWindows方法不论来的是什么element,都会返回一个只有一个GW元素的集合,而且GlobalWindow.get()方法会返回GW类的同一个静态GW实例,也就是说,不管来的是什么element,都会被放到那个静态GW实例中。

不信可以看下面的GW类,GW类初始化时就创建了一个静态GW,不管什么元素来了,都是放到这个静态实例中,这样就实现了所有元素进一个窗口。

 

好了,现在所有element都落到同一个窗口中了,我们再来看看CountEvictor干了什么吧

doEvictorAfter是一个boolean值,默认为false,这很容易理解,因为GW中存了那么多数据,窗口聚合之前当然要先清理一下,要把数据量减少到我们指定的窗口大小

evictor方法指定了如何清理窗口,内容也很好理解,就是上面说的删数据,直到窗口中数据量减少到我们指定的窗口大小

好了,CountEvictor也很easy,我们最后来看看CountTrigger

 我们看onElement方法,count变量中实际存放了当前窗口中的元素数量,每来一个element都加1,当数量等于窗口大小,触发计算,反之不触发。

ok,我们大概看了一下flink是怎么用window铁三角实现countwindow的,有点晕?没关系,下面我们综合地再解释一遍。

  1. 首先,assigner把到来的所有数据存到同一个window中
  2. 其次,当窗口中元素数量大于等于窗口大小时,触发trigger的fire计算
  3. 再次,窗口聚合前调用evictor,把窗口中元素数量减少到窗口大小对应的数量
  4. 最后,调用聚合函数,输出数据
结束语

以上,我们从WindowAssigner、Evictor、Trigger三个重要类的源码出发,解释了window是怎么定义的,最后通过剖析官方是如何利用铁三角灵活配合,实现了基于count的窗口。

顺带,我们还复习了java基础八股文中equals和hashcode的内容,深刻理解了什么时候我们需要重写这两个方法。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5574902.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存