使用过WindowedStream的同学一定对WindowAssigner、trigger和evictor不陌生,可以说这三者共同定义了一个window流的核心,为什么这么说呢,我们打开WindowedStream类瞅一瞅
可以看到,WindowedStream类一共就只有六个属性,最核心的就是红线划出来的铁三角
其中:
- WindowAssigner定义了进入窗口流的element进入哪些窗口,创建窗口流时必须由用户传入
- 当Trigger被流中的事件触发时,Trigger会决定当前窗口是否进行计算
- Evictor是流中的过滤器,可以选择是再窗口聚合之前过滤还是在窗口聚合之后过滤
三者在flink程序中的常见使用方法如下:
keyedStream.window(WindowAssigner).trigger(Trigger).evictor(Evictor).aggregate();
本文将按WindowAssigner、trigger、Evictor、聚合函数 四个小章节依次展开
WindowAssigner我们先来看看WA有哪些方法和属性吧
可以看到,最重要的是assignWindows方法,它为每一个传入的element返回它应该进入的window集合。我们再来看看这个方法在 SlidingEventTimeWindows 这个WindowAssigner子类中的实现(该类对应了滑动事件时间窗口)。老规矩,主要看⭐标注的地方
@Override public CollectionassignWindows(Object element, long timestamp, WindowAssignerContext context) { //⭐判断timestamp是否有效 if (timestamp > Long.MIN_VALUE) { //⭐根据窗口长度和滑动时间计算数据元素所属窗口的数量 List windows = new ArrayList<>((int) (size / slide)); //⭐找到窗口列表中最晚的时间窗口的开始时间 long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); //⭐循环创建当前数据元素所属的窗口列表 for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } }
需要注意的是,这里的实现是 直接返回了窗口集合 而不是找到已有的窗口然后将element放进去,很容易想到前者是一种更加高效的方法,因为Collection有去重功能,可以根据返回的集合中的TimeWindow的hash值来找到内存中的存储地址,然后把element放进去。如果你背过一点equals和hashcode的八股文,这里应该很容易猜到TimeWindow类重写了hashcode()方法和equals()方法,我们打开TimeWindow类看看到底是不是和我们猜的一样。
果然,和我们猜的一样,这两个方法被重写了,逻辑很简单,就是把window的开始时间和结束时间数学相加作为hash值,开始时间和结束时间同时相等时才判断相等。
上面我们看的是time window,那如果是count window呢,count window的开始和结束时间是不确定的,元素到来的时候是无法把窗口创建出来的,flink是如何实现的呢?
不急,这涉及了铁三角的综合使用,我们先把铁三角都先稍微了解一下再来研究研究count window是如何实现的。
Trigger
我们先来看看trigger的基类有哪些方法
几个重要方法的作用我都写出来了,需要注意的是,这三个箭头标出来的方法都是返回的TriggerResult,这个类是个枚举类,只有下面四种情况
我们再来看看基于事件时间的trigger子类 EventTimeTrigger 是怎么实现这几个定时器方法的吧,请看注释
Evictor
我们照例先看看evictor基类
evictor中只定义了evictorBefore和evcitorAfter两种方法,分别在窗口聚合之前和之后调用,作用都是过滤element,非常易懂
还记得吗,在介绍完WindowAssigner的时候,我留了一个问题,flink是如何基于铁三角实现基于count的window呢?
flink的实现非常巧妙,我们先来看看KeyedStream是如何转化成countwindow的
我们可以看到,WindowAssigner参数使用的是GlobalWindows,trigger传入的是CountTrigger,evictor传入的是CountEvictor,也就是flink自己基于WindowAssigner、Trigger、Evictor三个基类自定义的类,正好就是使用了一套铁三角。
我们来挨个看看这三个自定义类吧
先看看GlobalWindows,首先解释一下,flink中有GlobalWindow和GlobalWindows两个类,不要看花眼,其中一个最后带了s
- GlobalWindow:Window的子类,是一种存储所有数据的Window
- GlobalWindows:WindowAssigner的子类,GlobalWindow对应的assigner,把所有数据都放进同一个GlobalWindow
可以看到,GWs的assignWindows方法不论来的是什么element,都会返回一个只有一个GW元素的集合,而且GlobalWindow.get()方法会返回GW类的同一个静态GW实例,也就是说,不管来的是什么element,都会被放到那个静态GW实例中。
不信可以看下面的GW类,GW类初始化时就创建了一个静态GW,不管什么元素来了,都是放到这个静态实例中,这样就实现了所有元素进一个窗口。
好了,现在所有element都落到同一个窗口中了,我们再来看看CountEvictor干了什么吧
doEvictorAfter是一个boolean值,默认为false,这很容易理解,因为GW中存了那么多数据,窗口聚合之前当然要先清理一下,要把数据量减少到我们指定的窗口大小
evictor方法指定了如何清理窗口,内容也很好理解,就是上面说的删数据,直到窗口中数据量减少到我们指定的窗口大小
好了,CountEvictor也很easy,我们最后来看看CountTrigger
我们看onElement方法,count变量中实际存放了当前窗口中的元素数量,每来一个element都加1,当数量等于窗口大小,触发计算,反之不触发。
ok,我们大概看了一下flink是怎么用window铁三角实现countwindow的,有点晕?没关系,下面我们综合地再解释一遍。
- 首先,assigner把到来的所有数据存到同一个window中
- 其次,当窗口中元素数量大于等于窗口大小时,触发trigger的fire计算
- 再次,窗口聚合前调用evictor,把窗口中元素数量减少到窗口大小对应的数量
- 最后,调用聚合函数,输出数据
以上,我们从WindowAssigner、Evictor、Trigger三个重要类的源码出发,解释了window是怎么定义的,最后通过剖析官方是如何利用铁三角灵活配合,实现了基于count的窗口。
顺带,我们还复习了java基础八股文中equals和hashcode的内容,深刻理解了什么时候我们需要重写这两个方法。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)