11.2.9、flink核心

11.2.9、flink核心,第1张

11.2.9、flink核心 1、Sink端 --kafka

唯一一次Exactly once,两步提交
在checkpoint之前开启事务,checkpoint之后提交事务

2、flink读取kafka数据,结果直接写回kafka中(非聚合运算)

–可能造成数据读取显示两次重复(类似kafka的生产者了)
—读取kafka的数据(6,5,4,| 3,2,1),flink的source端做checkpoint安装挡板,写回kafka
—假设挡板在3和4之间,读取5的时候任务失败了,恢复后数据还是恢复到3这个,但是4,5之前会写入kafka中了,还会写一次
—checkpoint之前开启事务,之后提交事务
—使用两步提交,flink消费kafka数据又checkpoint可以直接恢复
—flink数据写回kafka中,读取需要使用只读提交的数据–isolation-level

read_committed
object Demo05flinkonKafkaExactlyOnce {
  def main(args: Array[String]): Unit = {
    //创建flink的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    
    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(20000)
    // 高级选项:
    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    val config: CheckpointConfig = env.getCheckpointConfig
    //任务失败后自动保留最新的checkpoint文件
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //设置状态后端,保存状态的位置
    val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)
    env.setStateBackend(stateBackend)
    
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "mdsgfgf")

    //创建kafka的消费者
    val flinkKafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("test1", new SimpleStringSchema(), properties)

    //读取最新的数据
    flinkKafkaConsumer.setStartFromLatest()
    val lineDS: DataStream[String] = env.addSource(flinkKafkaConsumer)

    //统计单词个数
    val wordDS: DataStream[String] = lineDS.flatMap(_.split(","))

    
    //    val myProducer = new FlinkKafkaProducer[String](
    //      "master:9092",  //broker集群
    //      "test2", // 目标 topic
    //      new SimpleStringSchema)

    

    val properties1: Properties = new Properties()
    properties1.setProperty("bootstrap.servers", "master:9092")

    //事务超时时间
    properties1.setProperty("transaction.timeout.ms", 5 * 60 * 1000 + "")

    val myProducer = new FlinkKafkaProducer[String](
      "test2", // 目标 topic
      new SimpleStringSchema,
      properties1,
      null, //分区方法
      Semantic.EXACTLY_ONCE, // 唯一一次
      5 //produce线程池的大小
    ) // 序列化 schema

    wordDS.addSink(myProducer)

    env.execute()

  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5696033.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存