11.2.3、flink核心

11.2.3、flink核心,第1张

11.2.3、flink核心 1、flink的时间

(1)事件时间(event time):数据里面自带的时间
(2)摄入时间(Ingestion TIme
):以source的系统时间为准
(3)处理时间(Processing TIm):处理数据的时间(北京时间…)

2、代码详解

水位线:默认等于最大的时间戳
—将60s分成0-5,5-10,达到水位线就执行

—使用处理时间就是直接写代码timeWindow(Time.seconds(5))设置就可以了
—使用事件时间需要手动设置 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
—需要指定哪个字段为时间.assignTimestampsAndWatermarks
—多个分区数据同时被消费,可能数据乱序,导致数据的丢失,设置水位线
—将水位线前移,这样可以保证数据不会丢失
—并行度设置为1,由于每隔task是独立的,所以当某一个task数据达到水位线时 就会执行,但是其他的task中还没有达到,可能导致数据的丢失
—但是在大数据环境中,数据量比较大,水位线基本会一致

object Demo03Event {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //设置flink时间模式为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    
    env.setParallelism(1)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "test1")
    //创建kafka的消费者
    val flinkKafkaCusumor = new FlinkKafkaConsumer[String]("words", new SimpleStringSchema(), properties)

    val kafkaDS: DataStream[String] = env.addSource(flinkKafkaCusumor)

    //每隔5s统计id出现的次数
    val eventDS = kafkaDS.map(line => {
      val splits = line.split(",")
      val id = splits(0)
      val time = splits(1)
      (id, time.toLong)
    })
      //告诉flink哪个字段为事件时间字段
      //.assignAscendingTimestamps(_._2)
      //指定水位线,水位线默认等于时间戳最大的时间
      //数据乱序,将水位线前移5s,这样会等5s才会触发
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
      override def extractTimestamp(element: (String, Long)): Long = {
        element._2
      }
    })

    eventDS.map(kv => (kv._1, 1))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)
      .print()

    

    env.execute()
  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5688025.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存