今天研究了下SlidingProcessTimeWindow的源码,把TimeWindow的生成和触发计算,大致搞清楚了,写一篇博客记录下。
要点:
- 这里讲的是ProcessTime的滑动窗口
- 每条数据都会触发窗口的分配(创建)
- 一条数据可能分配到多个窗口
- 不同数据触发的,key相同、start相同、end相同的窗口被认为是一个窗口
- 窗口和数据会被存入一个map,key是窗口对象,value是一个list,数据作为element存入list
- 新的窗口会触发注册一个定时器,定时器本质是一个有线程池管理的线程
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator二,SlidingProcessingTimeWindowssocketSource = env.socketTextStream("node1", 9999) .filter(line -> line != null && !"".equals(line)) .map(line -> new PriCh(line)); SingleOutputStreamOperator
显然,窗口是由SlidingProcessingTimeWindows创建的,窗口相关的功能必然应该从这里开始找。
看到一个方法: assignWindows,从方法名也可以看出这个类是干什么的:窗口分配。
看看assignWindows的源码:
public CollectionassignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; }
下面举例子说明窗口是如何分配的。
1timestamp = context.getCurrentProcessingTime(); 你可以理解这行代码是获取系统时间的,假设结果是:
16406973247922
窗口长度size=5,滑动长度slide=2,偏移量offset=0
3TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)涉及到一个简单的方法:
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
简单的说这个方法求出的是一个比timestamp小的最大的能被windowSize整除的时间戳。
有点绕口,多读两边就好。
提醒下:timestamp=1640697324792,windowSize=5
比1640697324792小但能被5整除的显然是1640697324790
也就是说lastStart = 1640697324790
4接下来看看for循环生成多少个window。
for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); }
生成了3个窗口,也就是说这个元素被分配到三个窗口。
三,WindowOperator搞清楚assignWindows的实现逻辑后,接下来的问题是谁调用了assignWindows?
通过IDEA工具很容易找到是WindowOperator的processElement方法。
代码很长,就不全部贴出来了。接下来是一个if-else,if是SessionWindow的逻辑直接跳过。
核心代码并不多,如下所示:
for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); }1
windowState.setCurrentNamespace(window); windowState.add(element.getValue());
这两行代码非常关键,解决了两个问题:
- window如何保存
- 数据如何保存
话分两头,先看windowState.setCurrentNamespace(window);,这个逻辑其实比较简单,就是将windowState的currentNamespace更新为当前窗口对象.
这招在写代码时常用,类似于写for循环时用一个临时变量记住某个中间值。
int temp = 0; for(int i = 0; i < 10000000000; i++) { temp = i; ...... ...... }
那windowState到底是什么东西,它是HeapListState类对象。别问我为什么知道,用idea打断点。
2windowState.add(element.getValue());
看看这行代码干了什么。
直接定位到HeapListState.add:
public void add(V value) { Preconditions.checkNotNull(value, "You cannot add null to a ListState."); final N namespace = currentNamespace; final StateTable> map = stateTable; List list = map.get(namespace); if (list == null) { list = new ArrayList<>(); map.put(namespace, list); } list.add(value); }
这段代码超级简单:
- 从一个StateTable(你可以理解为一个Map)中获取list对象,key是当前window对象
- 如果获取失败,创建一个list对象,把数据存入list,put到Table中
- 如果获取成功,将数据存入list中
这里有一个隐含逻辑:如何处理重复的窗口:
重复窗口的处理隐含在map.get(namespace),也就是说,会比较当前窗口是否在Table存在,显然从对象的角度来看,肯定是不存在的,因为当前窗口是new出来的。但是查看Table的get方法就会发现,比较是否存在会调用window对象hashcode方法,而TimeWindow的hashcode的源码是这样的:
@Override public int hashCode() { return MathUtils.longToIntWithBitMixing(start + end); }
哦,明白了,只要start+end一样,窗口就是一样的,哪怕不同元素创建的窗口是一样的,保存到map的窗口也不会重复,最终相同的窗口只会被trigger一次。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)