目录
watermark生成分为两种
图谱:
第一种:固定间隔时间 也是最常用的一种
第二种: 标记生成
不考虑SourceFunction生成wateramrk的方式,只分析DataStream.assign- TimestampsAndWatermarks方式的watermark生成
watermark生成分为两种- 第一种为固定间隔时间生成,间隔通过env参数设置
- 第二种为标记生成可根据数据规则自定义什么时间生成watermark
测试代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamdc = env.socketTextStream("localhost", 9999,'n',0); dc.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.minutes(3)) { @Override public long extractTimestamp(String element) { Watermark currentWatermark = getCurrentWatermark(); log.error("当前watermark:"+currentWatermark.getTimestamp()); String[] split = element.split(","); return Long.parseLong(split[1]); } }).print(); env.execute();
分析BoundedOutOfOrdernessTimestampExtractor类的源码,首先调用构造器 判断传入的 时间是不是<0, 然后将这个步长保存到maxOutOfOrderness(这里是3分钟换算为毫秒)中 初始化当前最大时间戳为long的最小值(-9223372036854595808)
public BoundedOutOfOrdernessTimestampExtractor2r(Time maxOutOfOrderness) { log.error("gouzao"); if (maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException( "Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative."); } this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();//watermark的步长 this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;//初始化当前最大时间戳为long的最小值+步长 }
接下来调用getCurrentWatermark 来计算watermark 这个方法是循环调用 循环时间可以通过env设置
@Override public final Watermark getCurrentWatermark() { log.error("getCurrentWatermark,currentMaxTimestamp:{},maxOutOfOrderness:{},lastEmittedWatermark:{}",currentMaxTimestamp,maxOutOfOrderness,lastEmittedWatermark); // this guarantees that the watermark never goes backwards. //如果当前最大时间戳-步长>=上次的watermark 那就使用新的lastEmittedWatermark作为当前的 //watermark,所以本质是:这个lastEmittedWatermark就是watermark包装的时间戳 long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); }
当来了一条数据时数据后会立即调用extractTimestamp方法,每次来的数据如果 事件时间大于当前 最大时间戳就重新赋值,记录当前最大时间戳
然后,又开始循环调用getCurrentWatermark,此时如果最大时间-步长后的时间 大于了上一次的watermark 则替换
@Override public final long extractTimestamp(T element, long previousElementTimestamp) { log.error("extractTimestamp"); long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp;//每次来的数据如果 事件时间大于当前 最大时间戳就重新赋值,记录当前最大时间戳 } return timestamp; }
所以说对于watermark的时间戳而言是一个循环检测并修改的逻辑, getCurrentWatermark()方法一直循环,extractTimestamp方法来了数据才调用
整个流程有关键三个参数
1,当前流时间里数据的最大时间戳,currentMaxTimestamp,每次来数据的时候回调用方法来决定是不是更新这个数据
2,watermark的步长,maxOutOfOrderness,这个是固定的,每次计算当前watermark时要用当前最大的时间戳-步长
3,lastEmittedWatermark 实际上指的是实际的watermark,每次循环调用getCurrentWatermark的时候,都会用currentMaxTimestamp-步长来决定是不是要把上一次的watermark替换掉
测试结果:红色是getCurrentWatermark一直循环调用 100ms一次 不间断 黄色是来了数据后的调用
第二种: 标记生成没有提供实现类,需要继承接口来自定义实现
做个最简单的测试能跑通就行:
static class PunctuatedTest implements AssignerWithPunctuatedWatermarks{ private long waterMark=0L; @Override public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) { log.error("调用checkAndGetNextWatermark,waterMark:{}",waterMark); String[] split = lastElement.split(","); if(split[0].equals("1")){ long end = Long.parseLong(split[1]); waterMark=end>waterMark?end:waterMark; } return new Watermark(waterMark);//当满足上述条件的时候生成watermark,也就是说自定义逻辑中可以包含步长的规则来实现自定义生成watermark } public long getWaterMarkTm (){ return waterMark; } @Override public long extractTimestamp(String element, long recordTimestamp) { log.error("调用extractTimestamp"); return 0; } }
测试结果是checkAndGetNextWatermark方法和extractTimestamp方法是在数据每来一条都调用一次来判断是否要生成watermark,具体两个方法要怎么实现就看具体要实现什么逻辑了
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)