Flink源码-SlidingProcessTimeWindow的创建和触发

Flink源码-SlidingProcessTimeWindow的创建和触发,第1张

Flink源码-SlidingProcessTimeWindow的创建和触发

今天研究了下SlidingProcessTimeWindow的源码,把TimeWindow的生成和触发计算,大致搞清楚了,写一篇博客记录下。

要点:

  • 这里讲的是ProcessTime的滑动窗口
  • 每条数据都会触发窗口的分配(创建)
  • 一条数据可能分配到多个窗口
  • 不同数据触发的,key相同、start相同、end相同的窗口被认为是一个窗口
  • 窗口和数据会被存入一个map,key是窗口对象,value是一个list,数据作为element存入list
  • 新的窗口会触发注册一个定时器,定时器本质是一个有线程池管理的线程
一,起点

我们是这样写代码的,注意设置窗口的那行代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator socketSource = env.socketTextStream("node1", 9999)
                .filter(line -> line != null && !"".equals(line))
                .map(line -> new PriCh(line));
        SingleOutputStreamOperator process = socketSource
                .keyBy(priCh -> priCh.name)
                
                // 设置窗口
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
                
                .process(new ProcessWindowFunction() {
                    @Override
                    public void process(String s, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
                        int total = 0;
                        for (PriCh p : iterable) {
                            total += p.pri;
                        }

                        collector.collect(total);
                    }
                });
 
二,SlidingProcessingTimeWindows 

显然,窗口是由SlidingProcessingTimeWindows创建的,窗口相关的功能必然应该从这里开始找。

看到一个方法: assignWindows,从方法名也可以看出这个类是干什么的:窗口分配。

看看assignWindows的源码:

public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		timestamp = context.getCurrentProcessingTime();
		List windows = new ArrayList<>((int) (size / slide));
		long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
		for (long start = lastStart;
			start > timestamp - size;
			start -= slide) {
			windows.add(new TimeWindow(start, start + size));
		}
		return windows;
	}

下面举例子说明窗口是如何分配的。

1

timestamp = context.getCurrentProcessingTime(); 你可以理解这行代码是获取系统时间的,假设结果是:

1640697324792
2

窗口长度size=5,滑动长度slide=2,偏移量offset=0

3

TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)涉及到一个简单的方法:

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
		return timestamp - (timestamp - offset + windowSize) % windowSize;
}

简单的说这个方法求出的是一个比timestamp小的最大的能被windowSize整除的时间戳。

有点绕口,多读两边就好。

提醒下:timestamp=1640697324792,windowSize=5
比1640697324792小但能被5整除的显然是1640697324790

也就是说lastStart = 1640697324790

4

接下来看看for循环生成多少个window。

for (long start = lastStart;
			start > timestamp - size;
			start -= slide) {
			windows.add(new TimeWindow(start, start + size));
}
窗口编号startwindow-startwindow-end116406973247901640697324790164069732479521640697324788164069732478816406973247933164069732478616406973247861640697324791

生成了3个窗口,也就是说这个元素被分配到三个窗口。

三,WindowOperator

搞清楚assignWindows的实现逻辑后,接下来的问题是谁调用了assignWindows?

通过IDEA工具很容易找到是WindowOperator的processElement方法。

代码很长,就不全部贴出来了。接下来是一个if-else,if是SessionWindow的逻辑直接跳过。

核心代码并不多,如下所示:

for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(window);
			}
1
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());

这两行代码非常关键,解决了两个问题:

  • window如何保存
  • 数据如何保存

话分两头,先看windowState.setCurrentNamespace(window);,这个逻辑其实比较简单,就是将windowState的currentNamespace更新为当前窗口对象.

这招在写代码时常用,类似于写for循环时用一个临时变量记住某个中间值。

int temp = 0;
for(int i = 0; i < 10000000000; i++) {
   temp = i;
   ......
   ......
}

那windowState到底是什么东西,它是HeapListState类对象。别问我为什么知道,用idea打断点。

2
windowState.add(element.getValue());

看看这行代码干了什么。

直接定位到HeapListState.add:

public void add(V value) {
		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");

		final N namespace = currentNamespace;

		final StateTable> map = stateTable;
		List list = map.get(namespace);

		if (list == null) {
			list = new ArrayList<>();
			map.put(namespace, list);
		}
		list.add(value);
	}

这段代码超级简单:

  • 从一个StateTable(你可以理解为一个Map)中获取list对象,key是当前window对象
  • 如果获取失败,创建一个list对象,把数据存入list,put到Table中
  • 如果获取成功,将数据存入list中

这里有一个隐含逻辑:如何处理重复的窗口:
重复窗口的处理隐含在map.get(namespace),也就是说,会比较当前窗口是否在Table存在,显然从对象的角度来看,肯定是不存在的,因为当前窗口是new出来的。但是查看Table的get方法就会发现,比较是否存在会调用window对象hashcode方法,而TimeWindow的hashcode的源码是这样的:

@Override
	public int hashCode() {
		return MathUtils.longToIntWithBitMixing(start + end);
	}

哦,明白了,只要start+end一样,窗口就是一样的,哪怕不同元素创建的窗口是一样的,保存到map的窗口也不会重复,最终相同的窗口只会被trigger一次。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5681403.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)