Flink:从业务实践角度聊聊Checkpoint、Savepoint、容错机制和业务升级

Flink:从业务实践角度聊聊Checkpoint、Savepoint、容错机制和业务升级,第1张

Flink:从业务实践角度聊聊Checkpoint、Savepoint、容错机制和业务升级

接着状态缓存和内存管理后,再聊聊容错机制

上文:Flink:从业务实践角度聊聊状态缓存和内存管理

多说一句,说实话个人之前研究这部分内容时,有时也会百度,但是很烦的就是,不知道是搜索引擎问题还是大家都喜欢粘贴复制,,,,粘贴复制也就算了,标题好歹统一下吧,可是可是,明明就是找别人的文章粘贴复制的或者直接官网文档粘贴复制,甚至连格式错别字都不管不顾,,,标题好像很牛皮,点进去,文档内容一样,再点下一个,内容一样,再点,还一样,,,搜出来的前几页的,文章标题有点区别,但是居然内容一摸一样,都是官方文档的内容,相似度200%铁证,,,,吐槽下搜索引擎,都2022年了,没点过滤去重机制吗!!再吐槽下,明明就是copy别人或者官方文档的一字不差的没点自己理解的,还偏要加点自己理解的标题。。。。标题党

说实话,copy借鉴没啥问题,但是有点自己的见解才好点,至少你搞点自己测试代码或者自己理解呀,我都怀疑你是不是正真看完过整篇内容(如有不适,见谅,主要是搜索引擎的锅!)

主要是真正要查资料时,看到连续几页的一摸一样的内容,真的是想把搜索给干掉,,,这不浪费我时间吗

本文基于V1.11

Flink实现Exactly-once

从flink实现exactly-once一致性语义逐步聊到容错机制

这是一个经典面试问题,基本流计算必问,而且必须得答好。

首先,Flink编程模型:source–>operator–>sink

那么就得满足全链路的Exactly-once

  • Source端(Kafka、Pulsar):状态缓存记录offset --> checkpoint时把offset提交保存到状态后端并同时Commit给Source如Kafka(此时不管job是否真的处理完这几条commit的数据,Kafka已经当作已经处理完了)–> 重启任务时,可以从状态中获取到offset,然后从对应offset重新消费数据

  • Operator:算子依靠状态缓存,中间计算结果都在状态数据里,并checkpoint保存到了状态后端,所以重启任务时,可以从状态后端中读取当时的中间计算结果,重新恢复计算状态

  • Sink:由于Source端有重置到状态缓存记录的Offset,所以会存在部分数据重复计算,所以需要Sink的存储支持幂等或事务即可实现。一般的存储如Hbase、Mysql等都支持,那么Kafka呢?Kafka也支持幂等和事务(0.11及以后版本),Kafka实现Exactly-Once主要依赖事务,并且通过2PC(two phase commit)两阶段提交方式实现

    • 第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
    • jobmanager 触发 checkpoint *** 作,barrier 从 source 开始向下传递,遇到barrier 的算子将状态存入状态后端,并通知 jobmanager
    • sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
    • jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
    • sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
    • 外部kafka关闭事务,提交的数据可以正常消费了。

    核心一点,未提交的数据不能够被下游消费,下游消费者要有个事务隔离级别参数:read_committed

以上就是Flink实现exactly-once的简述了,如上可知,实现exactly-once有个不可或缺的过程:状态缓存–checkpoint–状态后端

关于状态缓存和状态后端已经在上文中聊过了,那么就接着聊checkpoint

Flink的Checkpoint和Savepoint

面试必问,实践中也及其重要的东西. 而且Checkpoint和Savepoint都是一起出现的,所以一起聊

官方文档位置:Checkpoints

几个小结(教科书式的讲解请参考官方文档)

  • Savepoint是手动的Checkpoint

  • RocksDB状态后端时,Checkpoint可以增量,Savepoint不可以

  • 任务手动可以指定从Savepoint和Checkpoint启动,任务自动重启只会从最近一次Checkpoint启动

  • Checkpoint的目的是把数据快照(状态、Offset)保存到状态后端,注意,RocksDB的状态后端,checkpoint也是将RocksDB本地数据保存到配置的文件系统目录下

Flink的Checkpoint与Kafka Offset提交配置

关于开启checkpoint:https://www.bookstack.cn/read/flink-1.11.1-zh/8c2c9d36457a28b4.md

  • Kafka的offset提交有三种情况(开启了checkpoint后,KafkaSource的offset提交方式配置就失效,此时在kafkaSouce处设置提交方式)

    • 开启checkpoint时:在checkpoint完成后提交(FlinkKafkaConsumerbase.notifyCheckpointComplete)

    • 开启 checkpoint,禁用 checkpoint 提交:不依靠Flink提交,需要手动提交

    • 不开启 checkpoint,依赖KafkaSouce配置自动提交或者手动提交

      Properties properties = new Properties();
      properties.setProperty("bootstrap.servers","192.168.31.201:9092");
      properties.setProperty("group.id", "consumer-group");
      properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      properties.setProperty("auto.offset.reset", "latest");
      properties.setProperty("enable.auto.commit", "true");
      
      FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>("sensor", new SimpleStringSchema(), properties);
      // 若开启了checkpoint,那properties中的enable.auto.commit参数就会失效
      consumer.setCommitOffsetsOnCheckpoints(true);
      
  • 默认情况,checkpoint是禁用的,但是有状态的编程是一定要开启的

    如下是官方文档中的示例代码

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每 1000ms 开始一次 checkpoint
        env.enableCheckpointing(1000);
        // 高级选项:
        // 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 开启在 job 中止后仍然保留的 externalized checkpoints
        env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 允许在有更近 savepoint 时回退到 checkpoint
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
    
    Flink的Checkpoint过程与配置

    有如下几个重要的配置,在图中描述

​ checkpoint详细配置项参看上述给的文档里的说明,这里聊聊业务实践过程中如何配置

首先几个配置参考原则

  • 1.checkpoint不宜过于频繁,因为它的目的是记录状态,如果间隔太小,可能一段时间内状态并没有发生变化,会无意义浪费资源,影响性能

  • 2.checkpoint目的是保存状态和offset,状态过大会导致checkpoint耗时过长,占用网络IO

  • 3.状态大小跟业务的窗口大小,吞吐量,以及无窗口groupby和regular join(与时间间隔无关的join)配置的min和max保留时长有关

几个重要配置项的配置思路参考(其他配置项请参考官方文档按需配置)

  • 1.checkpoint周期:不宜频繁,一般在分钟级别,1~10分钟,不建议超过10分钟。出于的考虑是:如果任务失败(没有savepoint),重新恢复任务肯定只能从checkpoint恢复,那么周期越短,需要重新处理的数据越少,实时性恢复的越快。

  • 2.耗时超时配置项checkpoint.timeout.ms:一般不动,至少大于checkpoint周期,可配置为最大能接受checkpoint周期间隔即可,默认是10分钟

  • 3.between.checkpoints.min.ms:是两个cp最小间隔,目的是防止checkpoint积压:由于状态过大导致checkpoint耗时很长,上一个还没结束,下一个就已经开始这样积压起来。

  • 4.checkpoints.num-retained:checkpoint保留数,默认值1.如果希望任务失败时可以重跑更长时间的数据,那么可以根据需要增加此保留数,无非是增加磁盘存储

  • 5.其中还有个:任务取消后保留Checkpoint目录:这个涉及checkpoint保留策略,生产上按需设置,个人建议是取消作业仍然保留检查点:业务升级时如果改动了算子逻辑,需要重跑一段时间数据时可以指定从前几个checkpoint重启;但是要注意手动清理,尤其配置保留多个checkpoint时,会占用磁盘空间,savepoint同理

    CheckpointConfig config = env.getCheckpointConfig();
    // ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作业时保留检查点。请注意,在这种情况下,您必须在取消后手动清理检查点状态。
    // ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作业时删除检查点。只有在作业失败时,检查点状态才可用。
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
  • 较为详细的有参考价值的配置示例代码,其中8项都是比较重要的配置项,文章原文:Flink DataStream Checkpoint和Savepoint

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    
    // 1、开启Checkpoint
    // 默认情况下,不开启Checkpoint
    // 设置Checkpoint间隔(单位毫秒)大于0,即开启Checkpoint
    // 如果State比较大,建议增大该值
    checkpointConfig.setCheckpointInterval(10L * 1000);
    
    // 2、设置Checkpoint状态管理器
    // 默认MemoryStateBackend 支持MemoryStateBackend、FsStateBackend、RocksDBStateBackend三种
    // MemoryStateBackend: 基于内存的状态管理器,状态存储在JVM堆内存中。一般不应用于生产。
    // FsStateBackend: 基于文件系统的状态管理器,文件系统可以是本地文件系统,或者是HDFS分布式文件系统。
    // RocksDBStateBackend: 基于RocksDB的状态管理器,需要引入相关依赖才可使用。
    // true: 是否异步
    env.setStateBackend((StateBackend) new FsStateBackend("CheckpointDir", true));
    
    // 3、设置Checkpoint语义
    // EXACTLY_ONCE: 准确一次,结果不丢不重
    // AT_LEAST_ONCE: 至少一次,结果可能会重复
    // 注意: 如果要实现端到端的准确一次性语义(End-To-End EXACTLY_ONCE),除了这里设置EXACTLY_ONCE语义外,也需要Source和Sink支持EXACTLY_ONCE
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // 4、任务取消后保留Checkpoint策略
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    // 5、设置要保留的Checkpoint数量
    // 在conf/flink-conf.yaml中设置
    // 默认是1,只保留最新的一份Checkpoint
    // 如果需要从历史某个时刻恢复,这个参数很有用,可以根据Checkpoint间隔,设置成多个
    state.checkpoints.num-retained=20
    
    // 6、设置Checkpoint超时时间
    // Checkpoint超时时间,默认10分钟。当Checkpoint执行时间超过该值,Flink会丢弃此次Checkpoint并标记为失败,可从Flink WebUI Checkpoints上看到
    checkpointConfig.setCheckpointTimeout(long checkpointTimeout);
    
    // 7、设置Checkpoint之间的最小间隔
    // 两次Checkpoint之间的最小间隔,默认是0,单位毫秒。State太大,Checkpoint时间太长,而间隔又很短,则会导致大量Checkpoint任务积压,占用大量计算资源,进而影响任务性能
    checkpointConfig.setMinPauseBetweenCheckpoints(30000);
    
    // 8、设置同一时间点最多进行Checkpoint的数量,默认是1个
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    

    flink-conf.yaml全局配置

    // 设置 checkpoint全局设置保存点  
    state.checkpoints.dir: hdfs:///checkpoints/
    // 设置checkpoint 默认保留 数量  
    state.checkpoints.num-retained: 20
    

来,官网文档的checkpoint调优思路:大状态与 Checkpoint 调优

  • 监控状态和checkpoint是通过UI界面,很清晰哈
  • 调优:两个checkpoint之间的最小间隔设置要合理,别叠加和累积checkpoint任务在那里
  • 大状态建议用RocksDB
  • RocksDB的性能跟调优有关:超大状态时开启增量检查点、手动配置RocksDB的各个配置项(这个个人不专业,就不展开了,文档中有链接指导如何调整,不过实际业务中如果没有特别优化RocksDB的需求,一般不会去手动单个RocksDB参数调整,早期版本RocksDB参数是开放了,现在是默认关闭的,统一配置个托管内存而后Flink内部有比较通用的RocksDB参数使用)
  • 关于容量的预设:这个文档中是一大堆讲述合适容量对任务的必要性,业务实践中就是多给一些资源,一是出现某个TM失败时,有富余的TM立即启动去接替;一是适应高峰期。重点还是实时性的保证,如果任务对实时性没过多要求,那么只要做好监控和配置重启计划(这些得监控平台或者跟运维去计划好)即可。越是实时性要求高,越是资源冗余要求多点。并且TableAPI和SQL对于资源使用目前就是无法精细化的,就是冗余的(部分牛皮团队正二次开发意图优化这些,可以关注下每年的Flink-Forward Asia大会 )。
  • 快照压缩,正在持续优化中(我们实际业务正在探索),各位可以试试
  • 配置状态本地恢复state.backend.local-recoveryorCheckpointingOptions.LOCAL_RECOVERY,这项文档中很大篇幅描述了原因和 *** 作思路,就是状态特别大的时候,从网络上加载状态很慢,会影响恢复时间,影响实时性,解决方法就是在本地也保存一份状态,恢复时读本地的
Flink的Savepoint过程与应用

Savepoint一定是手动停止任务时生成,也需要手动清理,作用仍然是保存任务快照,方便重启时从上一次计算时间点开始连续的继续计算

先上官方文档关于savepoint位置(有关于详细的savepoint *** 作示例和用法,之前也查过百度的关于savepoint资料,大部分文章都是copy或者照抄官方文档内容,说明官方文档信息很足):Savepoints

几个业务实践上的注意点

  • Savepoint就是一陀文件,可以移动位置,但,一定要JM和TM可访问并且配置好路径
  • 任何一个任务都可以使用任何一个Savepoint去启动,无论是不是第一次启动,只是不一定能成功从Savepoint恢复里面的状态数据,如果任务的Source一致,那么即使状态数据不能恢复,也可以拿到对应的Offset接续着进行数据处理,checkpoint同理,只是只要是状态数据不能恢复,那么启动会报错,报错信息会提示你需要加个参数--allowNonRestoredState或-n,意思是忽略掉不能恢复的状态
  • Savepoint不会自动清理,需要手动清理策略
  • 业务实践中基本都会配置停止任务自动保存一个Savepoint
  • Savepoint正常启停不会有啥问题,往往在业务升级时会出问题:状态不可用、offset不能恢复等等,后面章节深入聊这部分
Flink的容错机制

Flink一个很亮眼的功能就是容错机制,加上它的分布式扩展特点,Flink+ML也是逐步的在业界探索中

容错容错,就是允许你发生错误,并能够自动恢复。Flink的容错依赖的是Checkpoint和Savepoint。自动恢复依赖的是Checkpoint,发生错误导致任务失败时会根据重启策略进行尝试重启。需要手动重启时就用到Savepoint或指定Checkpoint进行重启

来,上文档位置:Task 故障恢复

重启策略(Restart Strategy):flink-conf.yaml或通过ExecutionEnvironment对象调用setRestartStrategy配置

  • 固定延时重启策略:固定N次尝试重启,每次间隔一定延时时间,超过次数则失败

        restart-strategy.fixed-delay.attempts: 3
        restart-strategy.fixed-delay.delay: 10 s
    
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
          3, // 尝试重启的次数
          Time.of(10, TimeUnit.SECONDS) // 延时
        ));
    
  • 失败率重启策略:固定时间内,失败超过多少次就认定失败

        restart-strategy.failure-rate.max-failures-per-interval: 3
        restart-strategy.failure-rate.failure-rate-interval: 5 min
        restart-strategy.failure-rate.delay: 10 s
    
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
          3, // 每个时间间隔的最大故障次数
          Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
          Time.of(10, TimeUnit.SECONDS) // 延时
        ));
    
  • 直接失败不重启

        restart-strategy: none
    
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.noRestart());
    
  • 备用重启策略(Fallback Restart Strategy):使用集群定义的重启策略,也就是启用了checkpoint但是你又没配置其他重启策略,那么集群默认使用固定延时重启策略,并尝试Integer.MAX_VALUE次重启

  • 以上其实是3种,业务实践上建议配置固定延时重启策略或者失败率重启策略,不要一点都不配置。

  • 以上Flink内部的重启策略,仍然可能失败,业务实践上建议配合运维系统定义好Flink外部的监控报警和调度重启机制,避免意外

  • 流计算是一直无限的运行的,所以一定要监控好日志和Metric,如果运维强大可以配合制定好闲时重启机制的策略,如果有和外部系统交互的比如Kafka、Redis、Hbase等,也要做好相应的读写监控,尤其Kafka的流量峰值、Redis和Hbase的QPS等(个人业务实践时,维表使用的Hbase,高峰流量时,Hbase压力巨大(这个优化后面开文章好好聊,细节很多),曾一度拒绝FlinkJob链接或间断性的读数据慢的异常情况,导致Flink任务背压和挂掉并且自动重启失败,此时挂掉就依赖外部的重启机制了)。

恢复策略(Failover Strategies):flink-conf.yaml中配置jobmanager.execution.failover-strategy,配置值说明如下:

  • 全图重启:full:job的所有task全部重启
  • 基于 Region 的局部重启:region:job的所有task会被flink内部根据某种逻辑依赖,划分成一个个逻辑上的region,当有task故障时,会把task所在的region和下游的region进行重启,如果有依赖上游数据流,则关联的region也会被重启,目的是保证数据一致性。这种形式

业务实践中如何选择呢,一般会优先配置为region,保证实时性,快速恢复处理数据。本地测试时可以full

Flink的状态机制与业务升级

这个话题,个人见解,有不严谨的地方,请指出,可以多加讨论,适用于业务

任务临时暂停,然后重启继续运行,这基本不会有啥问题,很顺畅。但是,比如生产上报错了或者需求更新,我得改,改了业务逻辑,再上线,会不会和checkpoint或者savepoint冲突不兼容?肯定的,那咋搞,怎么解决?

来,贴文档:Savepoints

DataStreamAPI

如果是DataStreamAPI开发的话,一定记得每个算子都设置一个OperatorID,文档里也是强烈建议。

    DataStream stream = env.
      // Stateful source (e.g. Kafka) with ID
      .addSource(new StatefulSource())
      .uid("source-id") // ID for the source operator
      .shuffle()
      // Stateful mapper with ID
      .map(new StatefulMapper())
      .uid("mapper-id") // ID for the mapper
      // Stateless printing sink
      .print(); // Auto-generated ID
  • 这个OperatorID,也叫UID,有啥讲究呢,没啥讲究。随便给个唯一值就行,但不能改。为啥?有用

  • 还记得状态缓存吗,有状态的编程,状态数据在checkpoint或者savepoint后存在状态后端里,那重启时,状态数据怎么给到对应算子呢,到这里肯定就有思路了,就是根据uid给,存的时候也会与uid有个mapping关系。

  • 所以uid一定要设置。如果不设置那怎么状态恢复给对应算子呢?你不设置,系统自动帮你生成一个,而且是根据flink运行的DAG图关系和其他一些信息组合起来生成一个hash值。所以你不动程序,直接重启,肯定能恢复。但是如果动了,导致DAG图也改了,那必然hash值会改,那么所有状态都无法恢复了。。。直接GG

  • 设置了uid,但是状态数据结构改了咋办?也会恢复失败,但是你可以先保留旧的数据结构,然后开新的数据结构用来兼容过渡就可以解决这个问题。所以说,DataStreamAPI不存在无法恢复状态问题,只要好好设计uid,然后兼容好状态的数据结构即可。

TableAPI&SQL

坑就在这里,流批一体流批一体,重SQL的使用。Flink迅速火热,各个大厂都构建自己的FlinkSQL开发平台,实现了几乎不用一行代码,纯SQL,就能解决大部分场景需求。SQL开发迅速易上手,大大提升业务开发效率,这也是Flink后起之秀越来越代替Storm和SparkStreaming的原因。未来SQL肯定会越来越成熟和覆盖更多业务场景的开发(部分场景SQL可能无法处理或者很复杂所以还是会DataStreamAPI去搞),所以,未来Flink的使用,尤其流批一体的架构设计,SQL必须的熟悉使用!

上文说到uid解决状态恢复对应关系,TableAPI和SQL却无法手动设置uid,只能依靠自动生成,所以业务更新升级就无法避免会有状态无法恢复的情况

如下情况肯定可以恢复状态

  • 有状态的计算里增加where条件
  • 无状态的计算里可以随便改动

如下情况肯定无法恢复状态

  • 增减sql流程:比如额外加个sql3,无论sql3是否是有状态的计算,因为算子的执行图会作为uid生成的一部分

  • 有状态的计算里对group by或者聚合指标做了更改:你分组key(group by)都改了或者状态数据结构(聚合指标,比如增加个sum)都改了,肯定无法恢复。不过这种情况只有这个算子无法恢复,但是它无法恢复,是不是最终结果会被影响呢。。。。对吧

    比较重要的一点:状态无法恢复,但是offset可以恢复,只要topic没变

补充下小结:

(1)每个算子都有个uid

(2)savepoint保存的是当前的offset和各个算子对应的state

(3)DataStreamAPI可手动设置算子uid,进行精细化管理state和savepoint恢复

(4)TableAPI&SQL不可以手动设置uid,系统自动生成

(5)uid生成跟jobDAG等等有关,故新增、删除节点都会重新分配

(6)新增、删除SQL节点时,DAG里所有节点都会重新分配uid,savepoint里的state无法恢复,需加参数 --allowNonRestoredState (忽略不能映射到uid的state),Job方可重新启动

(7)修改有状态的计算SQL节点,只要不改group by相关计算,就可以正常从Savepoint恢复,否则状态恢复失败,需加参数 --allowNonRestoredState可忽略失败的状态是的算子继续启动

(8)修改无状态的计算SQL节点,无影响

(9)关于状态能不能恢复总结下:改了DAG必然无法恢复,原因是uid对应不上了;改了有状态计算的状态key(group by部分)和value的数据结构(sum、count这些)部分,只该节点状态无法恢复,不影响其他节点恢复情况。原因是uid能对上,但是新旧状态数据结构不一样导致无法恢复。

(10)只要必须加–allowNonRestoredState才能启动时,一定是存在部分或所有的节点状态不能恢复,可以用这点去检测是否会状态恢复失败

咋个解决呢?下面列举下可行的方案,因业务场景和实时性要求而做不同选择

  • (1)长窗口或累计计算等情况,第三方存储保存中间结果【部分场景可以解决】

  • (2)从较早checkpoint开始启动,允许状态恢复失败,但可以重复跑一段时间段数据来保证不丢失,需要下游支持幂等【推荐】

  • (3)流批混跑(流负责实时+批闲时替换流实时或批闲时重跑),批兜底【T+1报表之类的可以考虑:资源冗余、需要实现流批一体否则两套代码】

  • (4)闲时升级【推荐:重实时性、重数据准确或一致性】

  • (5)由于流计算实时性特点,部分业务实时准确性在升级逻辑时确实难以保证,与下游业务方沟通告知,且保证能接续offset继续计算。【推荐:重实时性、轻数据准确或一致性】

  • (6)重点逻辑DataStreamAPI开发并设置uid来保证此逻辑节点的状态恢复【重实时性、轻数据准确或一致性、但是难以预测业务的变更】

以上,个人愚见,若有表述错误的,还请指出,避免误导!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5698888.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存