- Time
- Window
- Time Window
- Session Window
- Count Window
- State
- checkPoint
Time的概念: Event Time and Processing Time(事件时间和处理时间)
- 处理时间:处理时间是指正在执行相应 *** 作的机器的系统时间。
当流程序按处理时间运行时,所有基于时间的 *** 作(如时间窗口)将使用运行相应算子的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整小时之间到达特定 *** 作员的所有记录。例如,如果应用程序在上午 9:15 开始运行,则第一个每小时处理时间窗口将包括上午 9:15 至上午 10:00 之间处理的事件,下一个窗口将包括上午 10:00 至上午 11:00 之间处理的事件,依此类推在。
- 事件时间:事件时间是每个单独事件在其产生设备上发生的时间。这个时间通常在记录进入 Flink 之前嵌入在记录中,并且可以从每条记录中提取该事件时间戳。在事件时间中,时间的进度取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是一种表示事件时间进度的机制。
package com.liu.window import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object Demo1TimeWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val lineds = env.socketTextStream("master", 8888) val kvDS = lineds.flatMap(_.split(",")).map((_, 1)) kvDS.keyBy(_._1) //滚动窗口,每隔5秒统计单词数 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //timeWindow(Time.seconds(5))上一行简写 .sum(1) .print() env.execute() } }Window
窗口在滑动和滚动基础上可以概括分为三大类:
Time Window:时间窗口
- SlidingProcessingTimeWindows : 处理时间的滑动窗口
- SlidingEventTimeWindows : 事件时间的滑动窗口
- TumblingProcessingTimeWindows : 处理时间的滚动窗口
- TumblingEventTimeWindows: 事件时间的滚动窗口
Session Window:会话窗口
- ProcessingTimeSessionWindows: 处理时间的会话窗口
- EventTimeSessionWindows : 事件时间的会话窗口
Count Window:计数窗口
Time Windowpackage com.liu.window import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object Demo1TimeWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val lineds = env.socketTextStream("master", 8888) val kvDS = lineds.flatMap(_.split(",")).map((_, 1)) kvDS.keyBy(_._1) //滚动窗口,每隔5秒统计单词数 .window(TumblingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(5))) // .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //timeWindow(Time.seconds(5))上一行简写 .sum(1) .print() env.execute() } }Session Window
package com.liu.window import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time object Demo2SessionWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val lineDS = env.socketTextStream("master", 8888) val kvDS = lineDS.map(line=>{ val split = line.split(",") (split(0),split(1).toLong) }) //指定时间字段 .assignAscendingTimestamps(_._2) kvDS.map(kv=>(kv._1,1)) .keyBy(_._1) .window(EventTimeSessionWindows.withGap(Time.seconds(5))) .sum(1) .print() env.execute() } }Count Window
package com.liu.window import org.apache.flink.streaming.api.scala._ object Demo3CountWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val lineds = env.socketTextStream("master", 8888) val kvDS = lineds.flatMap(_.split(",")).map((_, 1)) kvDS.keyBy(_._1) .countWindow(10)//每10调数据算一次 .sum(1) .print() env.execute() } }State
state:状态这个东西确实是抽象,学到这都直接懵,硬着头皮学
state: state 一般指一个具体的 task/operator 的状态:
- state 数据默认保存在 java 的堆内存中,TaskManage 节点的内存中。
- operator 表示一些算子在运行的过程中会产生的一些中间结果。
Flink 中有两种基本类型的 State:
Keyed State 和 Operator State Keyed State 和 Operator State,可以以两种形式存在: - 原始状态(raw state)
- 托管状态(managed state)
托管状态是由 Flink 框架管理的状态。 我们说 operator 算子保存了数据的中间结果,中间结果保存在什么类型中,如果我 们这里是托管状态,则由 flink 框架自行管理 原始状态由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候, 使用 byte[]来读写状态内容,对其内部数据结构一无所知。 通常在 DataStream 上的状态推荐使用托管的状态,当实现一个用户自定义的 operator 时,会使用到原始状态。
package com.liu.state import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ object Demo2ValueState { def main(args: Array[String]): Unit = { //创建flink的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度 env.setParallelism(2) //读取socket数据 //nc -lk 8888 val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //将单词拆分出来 val wordsDS: DataStream[String] = linesDS.flatMap(_.split(",")) val kvDS = wordsDS.map((_, 1)).keyBy(_._1) //使用map算子加上状态统计单词数量 val countDS = kvDS.map(new MyRichMapFunction) countDS.print() env.execute() } } class MyRichMapFunction extends RichMapFunction[(String,Int),(String,Int)] { var valueState: ValueState[Int] = _ override def open(parameters:Configuration):Unit= { //获取flink执行环境 val context = getRuntimeContext //创建状态描述对象 val valueStateDesc = new ValueStateDescriptor[Int]("count", classOf[Int]) //获取或创建状态 valueState = context.getState(valueStateDesc) } override def map(value: (String, Int)): (String, Int) = { //每一个数据更新状态 //获取之前的统计结果 val old = valueState.value() //加上数据 val newCount = old + value._2 //更新状态 valueState.update(newCount) //数据发送到下游 (value._1,newCount) } }
package com.liu.state import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import java.util object Demo3ListState { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) val lineDS = env.socketTextStream("master", 8888) val kvDS = lineDS.map(line => { val split = line.split(",") (split(4), split(2).toDouble) }) val keyByDS = kvDS.keyBy(_._1) keyByDS.map(new AvgMapFunction) env.execute() } } class AvgMapFunction extends RichMapFunction[(String, Double), (String, Double)] { var listState: ListState[Double]=_ override def open(parameters: Configuration): Unit = { val context = getRuntimeContext val listStateDesc = new ListStateDescriptor[Double]("age", classOf[Double]) listState = context.getListState(listStateDesc) } override def map(value: (String, Double)): (String, Double) ={ val clazz = value._1 val age = value._2 //把年龄保存到状态中 listState.add(age) //获取所有的年龄计算平均值 val ages: util.Iterator[Double] = listState.get().iterator() var count=0 var sum=0.0 while(ages.hasNext){ val a = ages.next() sum+=a count+=1 } val avgAge = sum/count (clazz,avgAge) } }checkPoint
简单来说就是为了持久化数据,当flink出现故障时数据可以恢复。
checkpoint 可以理解为 checkpoint 是把 state 数据定时持久化存储了,
State 可以被记录,在失败的情况下数据还可以恢复。
Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:
一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon
Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
Checkpoint n 将包含每个 operator 的 state,这些 state 是对应的 operator 消费了严格在 checkpoint barrier n 之前的所有事件,并且不包含在此(checkpoint barrier n)后的任何事件后而生成的状态。
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。
Flink 的 state backends 利用写时复制(copy-on-write)机制允许当异步生成旧版本的状态快照时,能够不受影响地继续流处理。只有当快照被持久保存后,这些旧版本的状态才会被当做垃圾回收。
package com.liu.state import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.runtime.state.StateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup object Demo4Checkpoint { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(10000) // 高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig.setCheckpointTimeout(60000) // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) val config: CheckpointConfig = env.getCheckpointConfig //任务失败后自动保留最新的checkpoint文件 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置状态后端,保存状态的位置 val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true) env.setStateBackend(stateBackend) //读取socker数据 //nc -lk 8888 val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //将单词拆分出来 val wordsDS: DataStream[String] = linesDS.flatMap(_.split(",")) wordsDS .map((_, 1)) .keyBy(_._1) //sum 底层使用的是valueState .sum(1) .print() env.execute() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)