1、watermark周期性生成,默认是200ms,可以修改为500msexecutionEnvironment.getConfig.setAutoWatermarkInterval(500)
2、WindowAssigner包括TumblingProcessingTimeWindows SlidingProcessingTimeWindows
3、window的API
sum,max,min,reduce(传入reduceFucntion )、aggreation增量计算,来一条数据就进行计算,等到窗口结束时直接输出之前算好的数据结果,效率高
process(传入processWiindowFucntion),appply(传入windowFunction)全窗口计算,先把窗口的数据收集起来,等到窗口结束的时候会遍历所有的数据,效率低,有些场景增量聚合的数没太大用,比如排序、算中位数, 全窗口函数获得信息更多,比如从上下文获得窗口信息,状态信息,更底层更灵活
4、其他可选的API
trigger触发器:定义窗口什么时候关闭,什么时候触发窗口进行计算并输出结果
evictor移除器:定义移出某些数据的逻辑,类似于filter
allowedlateness:传入时间,允许处理迟到的数据的时间,这些迟到数据会进入正常的流计算中
sideoutputlateData:将迟到数据放到侧输出流,如果过了allowedlateness的时间,就会放到侧输出流
getsideoutput:获取侧输出流
package flinkSourse import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time object FlinkWindow { def main(args: Array[String]): Unit = { val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment executionEnvironment.setParallelism(1) executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms // executionEnvironment.getConfig.setAutoWatermarkInterval(500) //watermark周期性生成,默认是200ms,可以修改为500ms val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111) val transforStream: DataStream[SensorReading] = stream2.map(data => { val tmpList: Array[String] = data.split(",") SensorReading(tmpList(0), tmpList(1).toLong, tmpList(2).toDouble) }) //增加watermark配置.punctated代表点状的,periodic代表周期性的(一般用这个) val transforEventStream: DataStream[SensorReading] = transforStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(t: SensorReading) = t.timestamp * 1000 }) // WindowAssigner包括TumblingProcessingTimeWindows SlidingProcessingTimeWindows // transforStream.keyBy(_.id).window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // transforStream.keyBy(_.id).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // //简单写法 // transforStream.keyBy(_.id).timeWindow(Time.seconds(10)) // transforStream.keyBy(_.id).timeWindow(Time.seconds(10), Time.seconds(2)) //window的API //sum,max,min,reduce(传入reduceFucntion )、aggreation增量计算,来一条数据就进行计算,等到窗口结束时直接输出之前算好的数据结果,效率高 //process(传入processWiindowFucntion),appply(传入windowFunction)全窗口计算,先把窗口的数据收集起来,等到窗口结束的时候会遍历所有的数据,效率低,有些场景增量聚合的数没太大用,比如排序、算中位数, // 全窗口函数获得信息更多,比如从上下文获得窗口信息,状态信息,更底层更灵活 //其他可选的API //trigger触发器:定义窗口什么时候关闭,什么时候触发窗口进行计算并输出结果 //evictor移除器:定义移出某些数据的逻辑,类似于filter //allowedlateness:传入时间,允许处理迟到的数据的时间,这些迟到数据会进入正常的流计算中 //sideoutputlateData:将迟到数据放到侧输出流,如果过了allowedlateness的时间,就会放到侧输出流 //getsideoutput:获取侧输出流 //统计每15秒每个传感器温度的最小值,时间戳最大值 val lateTag = new OutputTag[SensorReading]("late") val windowStream: DataStream[SensorReading] = transforEventStream .keyBy("id") .timeWindow(Time.seconds(15)) .allowedLateness(Time.minutes(1)) .sideOutputLateData(lateTag) .reduce((curdata, newdata) => { SensorReading(curdata.id, newdata.timestamp, curdata.temperature.min(newdata.temperature)) }) //窗口start的创建公式 timestamp - (timestamp - offset + windowSize) % windowSize,第一条数据是1547718199,窗口为[195-210),[210-225),[225-240) //1547718199000-(1547718199000+15000)%15000 val lateStream = windowStream.getSideOutput(lateTag) windowStream.print("windowSteam") lateStream.print("lateStream") executionEnvironment.execute("flinkWindow ") } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)