唯一一次Exactly once,两步提交
在checkpoint之前开启事务,checkpoint之后提交事务
–可能造成数据读取显示两次重复(类似kafka的生产者了)
—读取kafka的数据(6,5,4,| 3,2,1),flink的source端做checkpoint安装挡板,写回kafka
—假设挡板在3和4之间,读取5的时候任务失败了,恢复后数据还是恢复到3这个,但是4,5之前会写入kafka中了,还会写一次
—checkpoint之前开启事务,之后提交事务
—使用两步提交,flink消费kafka数据又checkpoint可以直接恢复
—flink数据写回kafka中,读取需要使用只读提交的数据–isolation-level
read_committed object Demo05flinkonKafkaExactlyOnce { def main(args: Array[String]): Unit = { //创建flink的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(20000) // 高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig.setCheckpointTimeout(60000) // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) val config: CheckpointConfig = env.getCheckpointConfig //任务失败后自动保留最新的checkpoint文件 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置状态后端,保存状态的位置 val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true) env.setStateBackend(stateBackend) val properties = new Properties() properties.setProperty("bootstrap.servers", "master:9092") properties.setProperty("group.id", "mdsgfgf") //创建kafka的消费者 val flinkKafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("test1", new SimpleStringSchema(), properties) //读取最新的数据 flinkKafkaConsumer.setStartFromLatest() val lineDS: DataStream[String] = env.addSource(flinkKafkaConsumer) //统计单词个数 val wordDS: DataStream[String] = lineDS.flatMap(_.split(",")) // val myProducer = new FlinkKafkaProducer[String]( // "master:9092", //broker集群 // "test2", // 目标 topic // new SimpleStringSchema) val properties1: Properties = new Properties() properties1.setProperty("bootstrap.servers", "master:9092") //事务超时时间 properties1.setProperty("transaction.timeout.ms", 5 * 60 * 1000 + "") val myProducer = new FlinkKafkaProducer[String]( "test2", // 目标 topic new SimpleStringSchema, properties1, null, //分区方法 Semantic.EXACTLY_ONCE, // 唯一一次 5 //produce线程池的大小 ) // 序列化 schema wordDS.addSink(myProducer) env.execute() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)