- EventTime 事件时间: 日志自己记录的生成时间
- IngestionTIme 摄入时间:Flink接收到日志的时间
- ProcessingTime 处理时间:Flink某个节点执行某个operation的时间(eg. timeWindow处理数据时的系统时间,默认的时间属性就是Processing Time)
- 是一个时间戳,Flink可以给数据流添加水印,收到一条消息后,额外给这个消息添加一个时间字段。
- 当数据流添加水印后,会按照水印时间来触发窗口计算。
- 一般会设置水印时间,比事件时间小几秒钟,表示最大允许数据延迟达到多久。
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 创建Flink执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
// 创建数据源
DataStreamSource<RawLogGroupList> ds = env.addSource(new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));
ds.flatMap() // 日志数据展开 ,输出Tuple5
.filter(value -> value.f0.equals("2"))
.assignTimestampsAndWatermarks(new WatermarkStrategy<Tuple5<String, String, String, String, String>>() {
@Override
public WatermarkGenerator<Tuple5<String, String, String, String, String>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Tuple5<String, String, String, String, String>>() {
private long maxTimeStamp=Long.MIN_VALUE;
@Override
public void onEvent(Tuple5<String, String, String, String, String> event, long eventTimestamp, WatermarkOutput output) {
long eventTime = sdf.parse(event.f2).getTime(); // 取tuple5中第2个元素为event time
maxTimeStamp=Math.max(maxTimeStamp,eventTime);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
long maxOutOfOrderness=3000; // 设置最大允许延长时间为3000秒
output.emitWatermark(new Watermark(maxTimeStamp-maxOutOfOrderness));
}
};
}
}.withTimestampAssigner((element, recordTimestamp) -> sdf.parse(element.f2).getTime()))
.keyBy(value -> value.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(30))) // 开30秒滚动窗
.apply(new WindowFunction<Tuple5<String, String, String, String, String>, String, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple5<String, String, String, String, String>> input, Collector<String> out) throws Exception {
Iterator<Tuple5<String, String, String, String, String>> iterator = input.iterator();
ArrayList<Long> list = new ArrayList<>();
while(iterator.hasNext()){
Tuple5<String, String, String, String, String> next = iterator.next();
list.add(sdf.parse(next.f2).getTime());
}
Collections.sort(list);
out.collect("key: "+s+", list.size: "+list.size()+", firstTime: "+ sdf.format(list.get(0))+", lastTime: "+sdf.format(list.get(list.size()-1))+", window.start: "+sdf.format(window.getStart())+", window.end: "+sdf.format(window.getEnd()));
}
})
.print();
env.execute("waterMarkDemo");
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)