对于WaterMark设计的位置是否会影响窗口的正常开闭?
下面我模拟了两种情景(source并行度为1,map并行度为2),分别是
1.在source后设置watermark,经过map后开窗
2.在map后设置watermark,然后开窗
ps: 下面的两种代码我都设置了自然增长的watermark,窗口时间都是5秒,只是设置watermark的位置不同
watermark是testWM对象的ts字段*1000
代码一:在Source后添加WaterMarkpublic class WMTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource代码二:在Map后设置WaterMarksource = env.socketTextStream("localhost", 9999); // TODO: 2021/12/1 在source后设置watermark SingleOutputStreamOperator sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy . forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(String element, long recordTimestamp) { String[] split = element.split(","); return Long.parseLong(split[2]) * 1000; } })); // TODO: 2021/12/1 设置map并行度为2 SingleOutputStreamOperator mapDS = sourceWithWM.map(r -> { String[] split = r.split(","); return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2])); }).setParallelism(2); SingleOutputStreamOperator resultDS = mapDS.keyBy(r -> r.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction () { @Override public void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception { out.collect("我关窗啦"); } }); resultDS.print(); env.execute(); } } @Data @AllArgsConstructor class testWM{ private int id; private int num; private long ts; }
public class WMTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource运行结果:source = env.socketTextStream("localhost", 9999); // TODO: 2021/12/1 设置map并行度为2 SingleOutputStreamOperator mapDS = source.map(r -> { String[] split = r.split(","); return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2])); }).setParallelism(2); // TODO: 2021/12/1 在map后添加watermark SingleOutputStreamOperator mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy . forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(testWM element, long recordTimestamp) { return element.getTs() * 1000; } })); SingleOutputStreamOperator resultDS = mapWithWM.keyBy(r -> r.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction () { @Override public void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception { out.collect("我关窗啦"); } }); resultDS.print(); env.execute(); } } @Data @AllArgsConstructor class testWM{ private int id; private int num; private long ts; }
对于第一种,在source后添加watermark的结果如下:
当1,1,1这条数据进入时,开启了[0,5)这个窗口,当1,1,9这条数据进入时,watermark升至9000(忽略watermark的减1).窗口关闭,没有问题
对于第二种,在map后添加watermark的结果如下:
可以很明显的发现,当第一条1,1,9进入时,[0,5)这个窗口并没有关闭.直到第二条1,1,9进入时,窗口才被关闭,这是为什么?
我针对以上两种情况画了下图来理解.
图一.图二描绘了source之后设置watermark的场景,一般来说,这是我们生产中需要的
我在之前的文章中提到过,WaterMark以广播的形式向下游发送,并且如果同时接收上游多条并行度的WaterMark时,以小的为准
这就导致图三(Map后设置WaterMark)中,我需要发送两条足够[0,5)这个窗口关闭的数据,才能真正关闭窗口,因为数据要经过轮询才能到达每个并行度
拓展:在KafkaSource中,已经做了很好得优化,在生产中我们一般设置并行度与topic分区数相同
如果设置得并行度比topic分区数多,那必然有并行度消费不到数据,就会导致WaterMark一直保持在Long.min_value.
当这种WaterMark向下游广播之后,会导致所有正常并行度的窗口全部无法关闭,因为WaterMark取了各个并行度的最小值
但是当这种状态保持一段时间之后,程序会在计算WaterMark的时候,自动过滤掉迟迟没有数据的并行度传进来的WaterMark,这就是KafkaSource的优化.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)