可以通过addSource方法来自定义SourceFunction,并可指定Timestamp和Watermark生成规则。addSource方法接收一个SourceFunction
SourceFunction中定义了一个run(SourceContext
- 数据源发送事件数据并生成Timestamp方法:collectWithTimestamp(T element,long timestamp)
element代表需发送的元素,timestamp代表这个元素对应的时间戳。该方法只在EventTime时有效,ProcessingTime时设置的timestamp直接忽略。 - 生成Watermark的方法:emitWatermark(Watermark mark)
当发送一个时间戳为T的mark时,表示该数据流上不会再有timestamp<=T的事件记录,一般来说,这个T是基于最大的timestamp来生成的,比如最大timestamp-1。
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import java.util.Arrays; //在SourceFunction函数中,指定Timestamp和生成Watermark示例 public class Test { public static void main(String[] args) throws Exception{ //创建流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置事件时间EventTime语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //并行度为1 env.setParallelism(1); //演示数据 Tuple3[] input = { Tuple3.of("user1", 1000L, 1), Tuple3.of("user1", 1999L, 2), Tuple3.of("user1", 2000L, 3), Tuple3.of("user1", 2100L, 4), Tuple3.of("user1", 2130L, 5) }; //通过示例数据生成DataStream DataStream> source = env.addSource( //SourceFunction中进行时间戳分配和水位线生成 new SourceFunction >() { @Override public void run(SourceContext > ctx) throws Exception { //遍历数据集 Arrays.asList(input).forEach(tp3 -> { //指定时间戳 ctx.collectWithTimestamp(tp3, (long) tp3.f1); System.out.println("collectWithTimestamp:"+ (long) tp3.f1); //发送水位线,当前元素时间戳-1 ctx.emitWatermark(new Watermark((long) tp3.f1 - 1)); System.out.println("emitWatermark:"+ ((long) tp3.f1 - 1)); System.out.println("**************************************"); }); //代表结束标志 ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); } @Override public void cancel() {} }); //结果打印 source.print(); //执行程序 env.execute(); } }
上面程序每接收到一个事件数据都会调用生成Timestamp和Watermark,Watermark值为当前事件的Timestamp-1
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)