Flink的Time、Window、State、Checkpoint

Flink的Time、Window、State、Checkpoint,第1张

Flink的Time、Window、State、Checkpoint

文章目录
    • Time
    • Window
        • Time Window
        • Session Window
        • Count Window
    • State
    • checkPoint

Time

Time的概念: Event Time and Processing Time(事件时间和处理时间)

  • 处理时间:处理时间是指正在执行相应 *** 作的机器的系统时间。

当流程序按处理时间运行时,所有基于时间的 *** 作(如时间窗口)将使用运行相应算子的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整小时之间到达特定 *** 作员的所有记录。例如,如果应用程序在上午 9:15 开始运行,则第一个每小时处理时间窗口将包括上午 9:15 至上午 10:00 之间处理的事件,下一个窗口将包括上午 10:00 至上午 11:00 之间处理的事件,依此类推在。

  • 事件时间:事件时间是每个单独事件在其产生设备上发生的时间。这个时间通常在记录进入 Flink 之前嵌入在记录中,并且可以从每条记录中提取该事件时间戳。在事件时间中,时间的进度取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是一种表示事件时间进度的机制。

package com.liu.window

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


object Demo1TimeWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val lineds = env.socketTextStream("master", 8888)

    val kvDS = lineds.flatMap(_.split(",")).map((_, 1))


    kvDS.keyBy(_._1)
      //滚动窗口,每隔5秒统计单词数
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      //timeWindow(Time.seconds(5))上一行简写
      .sum(1)
      .print()
    env.execute()

}
}

Window

窗口在滑动和滚动基础上可以概括分为三大类:
Time Window:时间窗口

  • SlidingProcessingTimeWindows : 处理时间的滑动窗口
  • SlidingEventTimeWindows : 事件时间的滑动窗口
  • TumblingProcessingTimeWindows : 处理时间的滚动窗口
  • TumblingEventTimeWindows: 事件时间的滚动窗口

Session Window:会话窗口

  • ProcessingTimeSessionWindows: 处理时间的会话窗口
  • EventTimeSessionWindows : 事件时间的会话窗口

Count Window:计数窗口

Time Window
package com.liu.window

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


object Demo1TimeWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val lineds = env.socketTextStream("master", 8888)
    
    val kvDS = lineds.flatMap(_.split(",")).map((_, 1))


    kvDS.keyBy(_._1)
      //滚动窗口,每隔5秒统计单词数
      .window(TumblingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(5)))
//      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      //timeWindow(Time.seconds(5))上一行简写
      .sum(1)
      .print()
    env.execute()

}
}

Session Window
package com.liu.window

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time


object Demo2SessionWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val lineDS = env.socketTextStream("master", 8888)

    val kvDS = lineDS.map(line=>{
      val split = line.split(",")
      (split(0),split(1).toLong)
    })
    //指定时间字段
      .assignAscendingTimestamps(_._2)
    
    kvDS.map(kv=>(kv._1,1))
      .keyBy(_._1)
      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
      .sum(1)
      .print()
    env.execute()
  }
}

Count Window
package com.liu.window

import org.apache.flink.streaming.api.scala._

object Demo3CountWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val lineds = env.socketTextStream("master", 8888)

    val kvDS = lineds.flatMap(_.split(",")).map((_, 1))

    kvDS.keyBy(_._1)
      .countWindow(10)//每10调数据算一次
      .sum(1)
      .print()

    env.execute()
  }
}

State

state:状态这个东西确实是抽象,学到这都直接懵,硬着头皮学

state: state 一般指一个具体的 task/operator 的状态:

  • state 数据默认保存在 java 的堆内存中,TaskManage 节点的内存中。
  • operator 表示一些算子在运行的过程中会产生的一些中间结果。
    Flink 中有两种基本类型的 State:
    Keyed State 和 Operator State Keyed State 和 Operator State,可以以两种形式存在:
  • 原始状态(raw state)
  • 托管状态(managed state)

托管状态是由 Flink 框架管理的状态。 我们说 operator 算子保存了数据的中间结果,中间结果保存在什么类型中,如果我 们这里是托管状态,则由 flink 框架自行管理 原始状态由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候, 使用 byte[]来读写状态内容,对其内部数据结构一无所知。 通常在 DataStream 上的状态推荐使用托管的状态,当实现一个用户自定义的 operator 时,会使用到原始状态。

package com.liu.state

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._


object Demo2ValueState {
  def main(args: Array[String]): Unit = {
    //创建flink的环境

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    //设置并行度
    env.setParallelism(2)

    //读取socket数据
    //nc -lk 8888

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    //将单词拆分出来
    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS = wordsDS.map((_, 1)).keyBy(_._1)

    //使用map算子加上状态统计单词数量
    val countDS = kvDS.map(new MyRichMapFunction)
    countDS.print()
    env.execute()
  }
}

class MyRichMapFunction extends RichMapFunction[(String,Int),(String,Int)] {
  
  var valueState: ValueState[Int] = _
  override def open(parameters:Configuration):Unit= {

  
  //获取flink执行环境
  val context = getRuntimeContext
    //创建状态描述对象
    val valueStateDesc = new ValueStateDescriptor[Int]("count", classOf[Int])
    //获取或创建状态
    valueState = context.getState(valueStateDesc)
 }
  override def map(value: (String, Int)): (String, Int) = {

    //每一个数据更新状态

    //获取之前的统计结果
    val old = valueState.value()
    //加上数据
    val newCount = old + value._2
    //更新状态
    valueState.update(newCount)
    //数据发送到下游
    (value._1,newCount)

  }
}

package com.liu.state

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

import java.util


object Demo3ListState {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

    val lineDS = env.socketTextStream("master", 8888)

    val kvDS = lineDS.map(line => {
      val split = line.split(",")
      (split(4), split(2).toDouble)
    })
    val keyByDS = kvDS.keyBy(_._1)
    keyByDS.map(new AvgMapFunction)

    env.execute()
  }
}

class AvgMapFunction extends RichMapFunction[(String, Double), (String, Double)] {

  var listState: ListState[Double]=_
  override def open(parameters: Configuration): Unit = {
    val context = getRuntimeContext
    val listStateDesc = new ListStateDescriptor[Double]("age", classOf[Double])
    listState = context.getListState(listStateDesc)
  }

  override def map(value: (String, Double)): (String, Double) ={
    val clazz = value._1
    val age = value._2
    //把年龄保存到状态中
    listState.add(age)
    //获取所有的年龄计算平均值
    val ages: util.Iterator[Double] = listState.get().iterator()
    var count=0
    var sum=0.0
    while(ages.hasNext){
      val a = ages.next()
      sum+=a
      count+=1
    }
    val avgAge = sum/count
    (clazz,avgAge)
}

}
checkPoint

简单来说就是为了持久化数据,当flink出现故障时数据可以恢复。
checkpoint 可以理解为 checkpoint 是把 state 数据定时持久化存储了,
State 可以被记录,在失败的情况下数据还可以恢复。

Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:

一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon
Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。


Checkpoint n 将包含每个 operator 的 state,这些 state 是对应的 operator 消费了严格在 checkpoint barrier n 之前的所有事件,并且不包含在此(checkpoint barrier n)后的任何事件后而生成的状态。

当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。

Flink 的 state backends 利用写时复制(copy-on-write)机制允许当异步生成旧版本的状态快照时,能够不受影响地继续流处理。只有当快照被持久保存后,这些旧版本的状态才会被当做垃圾回收。

package com.liu.state


import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup

object Demo4Checkpoint {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(10000)

    // 高级选项:

    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)


    val config: CheckpointConfig = env.getCheckpointConfig
    //任务失败后自动保留最新的checkpoint文件
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //设置状态后端,保存状态的位置
    val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)
    env.setStateBackend(stateBackend)


    //读取socker数据
    //nc -lk 8888

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    //将单词拆分出来
    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))


    wordsDS
      .map((_, 1))
      .keyBy(_._1)
      //sum 底层使用的是valueState
      .sum(1)
      .print()

    env.execute()


    

  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5605232.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存