【Flink】Watermark

【Flink】Watermark,第1张

Time
  • EventTime 事件时间: 日志自己记录的生成时间
  • IngestionTIme 摄入时间:Flink接收到日志的时间
  • ProcessingTime 处理时间:Flink某个节点执行某个operation的时间(eg. timeWindow处理数据时的系统时间,默认的时间属性就是Processing Time)
数据延迟产生的问题


WaterMark WaterMark是什么:
  • 是一个时间戳,Flink可以给数据流添加水印,收到一条消息后,额外给这个消息添加一个时间字段。
  • 当数据流添加水印后,会按照水印时间来触发窗口计算。
  • 一般会设置水印时间,比事件时间小几秒钟,表示最大允许数据延迟达到多久。
代码案例
public static void main(String[] args) throws Exception {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		// 创建Flink执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
		// 创建数据源
        DataStreamSource<RawLogGroupList> ds = env.addSource(new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));

        ds.flatMap()  // 日志数据展开 ,输出Tuple5
        .filter(value -> value.f0.equals("2"))
        .assignTimestampsAndWatermarks(new WatermarkStrategy<Tuple5<String, String, String, String, String>>() {
            @Override
            public WatermarkGenerator<Tuple5<String, String, String, String, String>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<Tuple5<String, String, String, String, String>>() {
                    private long maxTimeStamp=Long.MIN_VALUE;
                    @Override
                    public void onEvent(Tuple5<String, String, String, String, String> event, long eventTimestamp, WatermarkOutput output) {
                        long eventTime = sdf.parse(event.f2).getTime(); // 取tuple5中第2个元素为event time
                        maxTimeStamp=Math.max(maxTimeStamp,eventTime);
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        long maxOutOfOrderness=3000;  // 设置最大允许延长时间为3000秒
                        output.emitWatermark(new Watermark(maxTimeStamp-maxOutOfOrderness));
                    }
                };
            }
        }.withTimestampAssigner((element, recordTimestamp) -> sdf.parse(element.f2).getTime()))
        .keyBy(value -> value.f1)
        .window(TumblingEventTimeWindows.of(Time.seconds(30)))  // 开30秒滚动窗
        .apply(new WindowFunction<Tuple5<String, String, String, String, String>, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple5<String, String, String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple5<String, String, String, String, String>> iterator = input.iterator();
                ArrayList<Long> list = new ArrayList<>();
                while(iterator.hasNext()){
                    Tuple5<String, String, String, String, String> next = iterator.next();
                    list.add(sdf.parse(next.f2).getTime());
                }

                Collections.sort(list);

                out.collect("key: "+s+", list.size: "+list.size()+", firstTime: "+ sdf.format(list.get(0))+", lastTime: "+sdf.format(list.get(list.size()-1))+", window.start: "+sdf.format(window.getStart())+", window.end: "+sdf.format(window.getEnd()));
            }
        })
        .print();

        env.execute("waterMarkDemo");
     }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/924492.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存