11.2.6、flink核心

11.2.6、flink核心,第1张

11.2.6、flink核心 1、checkpoink的原理流程

jobmanager定期发起checkpoint,向source task发送触发标记(设置的时间)
—当sourcetask收到这个标记后会在数据流中安装挡板,同时自己会进行checkpoint,会将挡板向下游传递(防止分组前后的checkpoint点不一样)
—下游task收到挡板后进行checkpoint
—当所有的task都处理同一次checkpoint后,一次checkpoint就完成了
—删除旧了的checkpoint,只保留最新一次

2、增量快照导包

    org.apache.flink
    flink-statebackend-rocksdb_2.11
    1.11.2

3、代码实现

开启checkpoint
—任务失败后重新启动需要指定从哪一个checkpoint的路径中恢复任务 hdfs:///master:9000/flink/checkpoint/183606dcf5bcfde823b5a495ca435a04/chk-17
—命令行加一个-s恢复一样 flink run -c com.shujia.state.Demo4Checkpoint -s hdfs://master:9000/flink/checkpoint/8ae8e8bb237063d1c90295b84ae32a17/chk-31
flink-1.0.jar

object Demo04checkpoint {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    
    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(10000)

    // 高级选项:

    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    val config: CheckpointConfig = env.getCheckpointConfig
    //任务失败后自动保留最新的checkpoint文件
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //设置状态后端,保存状态的位置
    //val stateBackend: StateBackend = new FsStateBackend("hdfs://master:9000/flink/checkpoint", true)
    //增量快照
     val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)
    env.setStateBackend(stateBackend)
  
    //获取数据
    val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
    //将单词切分
    val wordsDS = lineDS.flatMap(_.split(","))
    //将数据存储kv
    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
    //将数据分组
    kvDS.keyBy(_._1)
      .sum(1)
      .print()
    env.execute()
    
  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5696609.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存