flink的窗口方法

flink的窗口方法,第1张

flink的窗口方法

1、watermark周期性生成,默认是200ms,可以修改为500msexecutionEnvironment.getConfig.setAutoWatermarkInterval(500)

2、WindowAssigner包括TumblingProcessingTimeWindows SlidingProcessingTimeWindows

3、window的API

sum,max,min,reduce(传入reduceFucntion )、aggreation增量计算,来一条数据就进行计算,等到窗口结束时直接输出之前算好的数据结果,效率高

process(传入processWiindowFucntion),appply(传入windowFunction)全窗口计算,先把窗口的数据收集起来,等到窗口结束的时候会遍历所有的数据,效率低,有些场景增量聚合的数没太大用,比如排序、算中位数, 全窗口函数获得信息更多,比如从上下文获得窗口信息,状态信息,更底层更灵活

4、其他可选的API 

trigger触发器:定义窗口什么时候关闭,什么时候触发窗口进行计算并输出结果

evictor移除器:定义移出某些数据的逻辑,类似于filter

allowedlateness:传入时间,允许处理迟到的数据的时间,这些迟到数据会进入正常的流计算中

sideoutputlateData:将迟到数据放到侧输出流,如果过了allowedlateness的时间,就会放到侧输出流

getsideoutput:获取侧输出流

package flinkSourse

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object FlinkWindow {
  def main(args: Array[String]): Unit = {
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    executionEnvironment.setParallelism(1)
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms
//    executionEnvironment.getConfig.setAutoWatermarkInterval(500) //watermark周期性生成,默认是200ms,可以修改为500ms
    val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)
    val transforStream: DataStream[SensorReading] = stream2.map(data => {
      val tmpList: Array[String] = data.split(",")
      SensorReading(tmpList(0), tmpList(1).toLong, tmpList(2).toDouble)
    })

    //增加watermark配置.punctated代表点状的,periodic代表周期性的(一般用这个)
    val transforEventStream: DataStream[SensorReading] = transforStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
      override def extractTimestamp(t: SensorReading) = t.timestamp * 1000
    })

    //    WindowAssigner包括TumblingProcessingTimeWindows  SlidingProcessingTimeWindows
    //    transforStream.keyBy(_.id).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    //    transforStream.keyBy(_.id).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)))
    //    //简单写法
    //    transforStream.keyBy(_.id).timeWindow(Time.seconds(10))
    //    transforStream.keyBy(_.id).timeWindow(Time.seconds(10), Time.seconds(2))

    //window的API
    //sum,max,min,reduce(传入reduceFucntion )、aggreation增量计算,来一条数据就进行计算,等到窗口结束时直接输出之前算好的数据结果,效率高
    //process(传入processWiindowFucntion),appply(传入windowFunction)全窗口计算,先把窗口的数据收集起来,等到窗口结束的时候会遍历所有的数据,效率低,有些场景增量聚合的数没太大用,比如排序、算中位数,
    // 全窗口函数获得信息更多,比如从上下文获得窗口信息,状态信息,更底层更灵活
    //其他可选的API
    //trigger触发器:定义窗口什么时候关闭,什么时候触发窗口进行计算并输出结果
    //evictor移除器:定义移出某些数据的逻辑,类似于filter
    //allowedlateness:传入时间,允许处理迟到的数据的时间,这些迟到数据会进入正常的流计算中
    //sideoutputlateData:将迟到数据放到侧输出流,如果过了allowedlateness的时间,就会放到侧输出流
    //getsideoutput:获取侧输出流

    //统计每15秒每个传感器温度的最小值,时间戳最大值
    val lateTag = new OutputTag[SensorReading]("late")

    val windowStream: DataStream[SensorReading] = transforEventStream
      .keyBy("id")
      .timeWindow(Time.seconds(15))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(lateTag)
      .reduce((curdata, newdata) => {
        SensorReading(curdata.id, newdata.timestamp, curdata.temperature.min(newdata.temperature))
      })
    //窗口start的创建公式 timestamp - (timestamp - offset + windowSize) % windowSize,第一条数据是1547718199,窗口为[195-210),[210-225),[225-240)
    //1547718199000-(1547718199000+15000)%15000


    val lateStream = windowStream.getSideOutput(lateTag)

    windowStream.print("windowSteam")
    lateStream.print("lateStream")


    executionEnvironment.execute("flinkWindow ")

  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5118177.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存