package cn.itcast.streaming.watermark;
import cn.itcast.entity.ItcastDataObj;
import org.apache.flink.streaming.api.functions.*;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
public class TripDriveWatermark implements AssignerWithPeriodicWatermarks
//定义当前窗口最大的水印时间戳
Long currentMaxTimestamp = 0L;
//定义允许最大乱序的事件:30s
Long maxOutOfOrderness = 1000*30L;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(ItcastDataObj itcastDataObj, long l) {
//获取到当前窗口最大的事件时间
currentMaxTimestamp = Math.max(currentMaxTimestamp, itcastDataObj.getTerminalTimeStamp());
System.out.println("当前数据时间:"+itcastDataObj.getTerminalTimeStamp());
return currentMaxTimestamp;
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)