flink生成Watermark之SourceFunction

flink生成Watermark之SourceFunction,第1张

flink生成Watermark之SourceFunction

可以通过addSource方法来自定义SourceFunction,并可指定Timestamp和Watermark生成规则。addSource方法接收一个SourceFunction参数,OUT表示返回的元素类型,并返回一个DataStreamSource类型的对象。
SourceFunction中定义了一个run(SourceContext ctx)方法来启动数据源,而SourceContext对象中定义了:

  1. 数据源发送事件数据并生成Timestamp方法:collectWithTimestamp(T element,long timestamp)
    element代表需发送的元素,timestamp代表这个元素对应的时间戳。该方法只在EventTime时有效,ProcessingTime时设置的timestamp直接忽略。
  2. 生成Watermark的方法:emitWatermark(Watermark mark)
    当发送一个时间戳为T的mark时,表示该数据流上不会再有timestamp<=T的事件记录,一般来说,这个T是基于最大的timestamp来生成的,比如最大timestamp-1。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.Arrays;
//在SourceFunction函数中,指定Timestamp和生成Watermark示例
public class Test {
    public static void main(String[] args) throws Exception{
        //创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置事件时间EventTime语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //并行度为1
        env.setParallelism(1);
        //演示数据
        Tuple3[] input = {
                Tuple3.of("user1", 1000L, 1),
                Tuple3.of("user1", 1999L, 2),
                Tuple3.of("user1", 2000L, 3),
                Tuple3.of("user1", 2100L, 4),
                Tuple3.of("user1", 2130L, 5)
        };
        //通过示例数据生成DataStream
        DataStream> source = env.addSource(
                //SourceFunction中进行时间戳分配和水位线生成
                new SourceFunction>() {
                    @Override
                    public void run(SourceContext> ctx) throws Exception {
                    		//遍历数据集
                            Arrays.asList(input).forEach(tp3 -> {
                                //指定时间戳
                                ctx.collectWithTimestamp(tp3, (long) tp3.f1);
                                System.out.println("collectWithTimestamp:"+ (long) tp3.f1);
                                //发送水位线,当前元素时间戳-1
                                ctx.emitWatermark(new Watermark((long) tp3.f1 - 1));
                                System.out.println("emitWatermark:"+ ((long) tp3.f1 - 1));
                                System.out.println("**************************************");
                            });
                            //代表结束标志
                            ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
                    }
                    @Override
                    public void cancel() {}
                });
        //结果打印
        source.print();
        //执行程序
        env.execute();
    }
}

上面程序每接收到一个事件数据都会调用生成Timestamp和Watermark,Watermark值为当前事件的Timestamp-1

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5573490.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存