(1)事件时间(event time):数据里面自带的时间
(2)摄入时间(Ingestion TIme
):以source的系统时间为准
(3)处理时间(Processing TIm):处理数据的时间(北京时间…)
水位线:默认等于最大的时间戳
—将60s分成0-5,5-10,达到水位线就执行
—使用处理时间就是直接写代码timeWindow(Time.seconds(5))设置就可以了
—使用事件时间需要手动设置 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
—需要指定哪个字段为时间.assignTimestampsAndWatermarks
—多个分区数据同时被消费,可能数据乱序,导致数据的丢失,设置水位线
—将水位线前移,这样可以保证数据不会丢失
—并行度设置为1,由于每隔task是独立的,所以当某一个task数据达到水位线时 就会执行,但是其他的task中还没有达到,可能导致数据的丢失
—但是在大数据环境中,数据量比较大,水位线基本会一致
object Demo03Event { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置flink时间模式为事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val properties = new Properties() properties.setProperty("bootstrap.servers", "master:9092") properties.setProperty("group.id", "test1") //创建kafka的消费者 val flinkKafkaCusumor = new FlinkKafkaConsumer[String]("words", new SimpleStringSchema(), properties) val kafkaDS: DataStream[String] = env.addSource(flinkKafkaCusumor) //每隔5s统计id出现的次数 val eventDS = kafkaDS.map(line => { val splits = line.split(",") val id = splits(0) val time = splits(1) (id, time.toLong) }) //告诉flink哪个字段为事件时间字段 //.assignAscendingTimestamps(_._2) //指定水位线,水位线默认等于时间戳最大的时间 //数据乱序,将水位线前移5s,这样会等5s才会触发 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) { override def extractTimestamp(element: (String, Long)): Long = { element._2 } }) eventDS.map(kv => (kv._1, 1)) .keyBy(_._1) .timeWindow(Time.seconds(5)) .sum(1) .print() env.execute() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)