jobmanager定期发起checkpoint,向source task发送触发标记(设置的时间)
—当sourcetask收到这个标记后会在数据流中安装挡板,同时自己会进行checkpoint,会将挡板向下游传递(防止分组前后的checkpoint点不一样)
—下游task收到挡板后进行checkpoint
—当所有的task都处理同一次checkpoint后,一次checkpoint就完成了
—删除旧了的checkpoint,只保留最新一次
3、代码实现org.apache.flink flink-statebackend-rocksdb_2.111.11.2
开启checkpoint
—任务失败后重新启动需要指定从哪一个checkpoint的路径中恢复任务 hdfs:///master:9000/flink/checkpoint/183606dcf5bcfde823b5a495ca435a04/chk-17
—命令行加一个-s恢复一样 flink run -c com.shujia.state.Demo4Checkpoint -s hdfs://master:9000/flink/checkpoint/8ae8e8bb237063d1c90295b84ae32a17/chk-31
flink-1.0.jar
object Demo04checkpoint { def main(args: Array[String]): Unit = { //创建flink环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(10000) // 高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig.setCheckpointTimeout(60000) // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) val config: CheckpointConfig = env.getCheckpointConfig //任务失败后自动保留最新的checkpoint文件 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置状态后端,保存状态的位置 //val stateBackend: StateBackend = new FsStateBackend("hdfs://master:9000/flink/checkpoint", true) //增量快照 val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true) env.setStateBackend(stateBackend) //获取数据 val lineDS: DataStream[String] = env.socketTextStream("master", 8888) //将单词切分 val wordsDS = lineDS.flatMap(_.split(",")) //将数据存储kv val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1)) //将数据分组 kvDS.keyBy(_._1) .sum(1) .print() env.execute() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)