Flink从BucketSink看checkpoint与故障恢复

Flink从BucketSink看checkpoint与故障恢复,第1张

看了 BucketSink 的相关源码。着重看了它的checkpoint以及故障恢复机制。

把大概的理解梳理如下:

BucketSink 大体的工作流程:

1新建一个文件,不断的写入文件中,后缀命名为 in-progress

2判断文件写入完毕,关闭该文件时,后缀名命名为 pending

3checkpoint触发时,将上次ck到这次ck间的所有 pending 文件变为 finish 状态

BucketSink 实现了 CheckpointedFunction 接口

有两个方法

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;�其中:

initializeState 方法在每次新建 BucketSink 或者故障恢复时 会调用。

snapshotState 在每次触发 ck 时会被调用。

下面简单分析下这两个方法的逻辑:

initializeState 方法主要执行一些初始化 *** 作,其中我认为关键的在于

restoredBucketStates = stateStoregetSerializableListState("bucket-states");

该方法获取一个叫做 bucket-states 的状态对象,从名称也可知,该对象用于重启。正常情况下,该对象无内容下面的for语句不会执行。但是若有故障重启的情况,则会从上次的ck中读取出内容,也就是上次ck的状态信息,然后执行回滚 *** 作保证数据的一致性。这一点最后再做介绍。

snapshotState 方法用于触发 ck *** 作。

这个方法做了如下几件事

1获取当前正在写的 pending 文件的大小,以便若下次 ck 前发生故障,可以获知本次ck时,该文件的大小,以便删除本次ck后到故障发生时写入的数据,或者显示该文件的有效数据大小。

2将所有 pending 状态的文件存储到list中,稍后ck结束后,方便修改其状态为 finish

3将当前状态存入 restoredBucketStates 对象,以便若下次 ck 前发生故障,可以从这个状态处进行恢复。

同时,BucketSink也实现了 CheckpointListener 接口

void notifyCheckpointComplete(long checkpointId) throws Exception;

该方法会在 ck 完成后调用。

该方法,将 pending 文件的状态转为 final 状态

并且移除writer已经处于close状态的bucket。

最后详细说一下故障恢复。

当程序因故障自动恢复时,initializeState 方法的 restoredBucketStates 就会从上次 ck 中获取到上次ck时的状态。进而进行恢复。

首先,将 pending 状态的文件名列表清空即可,因为将 pending 状态转为 finish 状态,可以在 notifyCheckpointComplete 方法中完成。故障恢复时,该方法对 pending 的文件的做法是不做处理,等待故障恢复之后,第一次ck触发时,便会自动的将 pending 的文件变为 finish 状态。

而之所以不处理 pending 状态文件,是因为 pending 状态文件说明该文件已经写入完毕,就差ck成功后修改文件状态(也就是文件名)而已,本质上,该文件已经不再写入数据,没有数据的变化。

接下来 handlePendingInProgressFile 就是处理 in-progress 状态的文件。

我们设想一下,故障重启是指在上次成功的ck之后,下次ck之前,发生了故障,然后应用自动重启,使用的是上次成功的ck的状态信息。

这样的话,上次 ck 时状态为 in-progress 的文件,可能在故障发生时,已经处于 pending 状态,也就是写完的状态,也可能仍然处于 in-progress 状态。

flink的做法是,不管处于什么状态 首先全部标注为 finish 状态。然后根据上次ck时状态中存储的文件的大小进行截断,这样,该文件就能回滚到上次ck成功时的状态。若 Hadoop 版本不支持截断 *** 作,则新建一个后缀为 valid-length 的文件,内容为文件的大小,单位 byte。

然后flink就可以从上次ck处重新拉取数据源,继续处理,写入sink。

最后,调用 handlePendingFilesForPreviousCheckpoints 将上次ck成功后,若故障发生的很快,没来得及调用 CheckpointListener 的 notifyCheckpointComplete 方法,则此处将文件状态置为 finish 。

BucketSink 是一个控制类,具体的写入 *** 作可以自己实现 orgapacheflinkstreamingconnectorsfsWriter 接口。

其中 snappy 等压缩文件的追加,可以使用

Fsappend 的方式追加内容到同一文件中

周末了,不想搞长篇大论,就写写这样的流水账吧。

Flink的常见异常众多,不可能面面俱到,所以想到哪儿写到哪儿,有漏掉的之后再补充。

这不是个显式错误,但是JDK版本过低很有可能会导致Flink作业出现各种莫名其妙的问题,因此在生产环境中建议采用JDK 8的较高update(我们使用的是181)。

该信息不甚准确,因为绝大多数情况下都不是JAR包本身有毛病,而是在作业提交过程中出现异常退出了。因此需要查看本次提交产生的客户端日志(默认位于$FLINK_HOME/logs目录下),再根据其中的信息定位并解决问题。

一般都是因为用户依赖第三方包的版本与Flink框架依赖的版本有冲突导致。如果是采用Maven做项目管理的话,可参照我之前写的 这篇文章 来解决冲突。

就是字面意思,YARN集群内没有足够的资源启动Flink作业。检查一下当前YARN集群的状态、正在运行的YARN App以及Flink作业所处的队列,释放一些资源或者加入新的资源。

slot分配请求超时,是因为TaskManager申请资源时无法正常获得,按照上一条的思路检查即可。

TaskManager的Container因为使用资源超限被kill掉了。首先需要保证每个slot分配到的内存量足够,特殊情况下可以手动配置SlotSharingGroup来减少单个slot中共享Task的数量。如果资源没问题,那么多半就是程序内部发生了内存泄露。建议仔细查看TaskManager日志,并按处理JVM OOM问题的常规 *** 作来排查。

TaskManager心跳超时。有可能是TaskManager已经失败,如果没有失败,那么有可能是因为网络不好导致JobManager没能收到心跳信号,或者TaskManager忙于GC,无法发送心跳信号。JobManager会重启心跳超时的TaskManager,如果频繁出现此异常,应该通过日志进一步定位问题所在。

Flink on YARN的其他问题,还可以参考 这篇 ,非常有帮助。

该异常几乎都是由于程序业务逻辑有误,或者数据流里存在未处理好的脏数据导致的,继续向下追溯异常栈一般就可以看到具体的出错原因,比较常见的如POJO内有空字段,或者抽取事件时间的时间戳为null等。

很多童鞋拿着这两条异常信息来求助,但实际上它们只是表示BufferPool、MemoryManager这些Flink运行时组件被销毁,亦即作业已经失败。具体的原因多种多样,根据经验,一般是上一条描述的情况居多(即Could not forward element to next operator错误会伴随出现),其次是JDK版本问题。具体情况还是要根据TaskManager日志具体分析。

Akka超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。如果负载或网络问题无法彻底缓解,需考虑调大 akkaasktimeout 参数的值(默认只有10秒);另外,调用外部服务时尽量异步 *** 作(Async I/O)。

这个异常我们应该都不陌生,首先检查系统 ulimit -n 的文件描述符限制,再注意检查程序内是否有资源(如各种连接池的连接)未及时释放。值得注意的是,Flink使用RocksDB状态后端也有可能会抛出这个异常,此时需修改flink-confyaml中的 statebackendrocksdbfilesopen 参数,如果不限制,可以改为-1。

关于文件描述符的一些有趣知识,可以参见之前我写的 这一篇 。

在Flink内使用Java Lambda表达式时,由于类型擦除造成的副作用(详情见 这篇文章 ),注意调用returns()方法指定被擦除的类型。

在当前检查点还未做完时,收到了更新的检查点的barrier,表示当前检查点不再需要而被取消掉,一般不需要特殊处理。

首先应检查 CheckpointConfigsetCheckpointTimeout() 方法设定的检查点超时,如果设的太短,适当改长一点。另外就是考虑发生了反压或数据倾斜,或者barrier对齐太慢。具体思路不再赘述,看官可以参考 这篇文章 ,非常详细。

我们知道Flink的状态是按key组织并保存的,如果程序逻辑内改了keyBy()逻辑或者key的序列化逻辑,就会导致检查点/保存点的数据无法正确恢复。所以如果必须要改key相关的东西,就弃用之前的状态数据吧。

在19之前的Flink版本中,如果我们使用RocksDB状态后端,并且更改了自用MapState的schema,恢复作业时会抛出此异常,表示不支持更改schema。这个问题已经在 FLINK-11947 解决,升级版本即可。

就酱吧,民那晚安(不是

一、Checkpoint 简介

Flink 的 Checkpoint 机制是其 可靠性 的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport (分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

二、Checkpoint 机制流程详解

1 任务启动

我们假设任务从 Kafka 的某个 Topic 中读取数据,该Topic 有 2 个 Partition,故任务的并行度为 2。根据读取到数据的奇偶性,将数据分发到两个 task 进行求和。

某一时刻,状态如下:

2启动Checkpoint

JobManager 根据 Checkpoint 间隔时间,启动 Checkpoint。此时会给每个 Source 发送一个 barrier 消息,消息中的数值表示 Checkpoint 的序号,每次启动新的 Checkpoint 该值都会递增。

3 Source启动Checkpoint

当Source接收到barrier消息,会将当前的状态(Partition、Offset)保存到 StateBackend,然后向 JobManager 报告Checkpoint 完成。之后Source会将barrier消息广播给下游的每一个 task:

4task 接收 barrier

当task接收到某个上游(如这里的Source1)发送来的barrier,会将该上游barrier之前的数据继续进行处理,而barrier之后发送来的消息不会进行处理,会被缓存起来。

之前对barrier的理解比较模糊,直到看到了下面这幅图。barrier的作用和这里 "欢迎光临" 牌子的作用类似,用于区分流中的数据属于哪一个 Checkpoint:

我们可以理解为:barrier之前的数据属于本次Checkpoint,barrier之后的数据属于下一次Checkpoint,所以下次Checkpoint的数据是不应该在本次Checkpoint过程中被计算的,因此会将数据进行缓存。

5barrier对齐

如果某个task有多个上游输入,如这里的 sum_even 有两个 Source 源,当接收到其中一个 Source 的barrier后,会等待其他 Source 的 barrier 到来。在此期间,接收到 barrier 的 Source 发来的数据不会处理,只会缓存(如下图中的数据4)。而未接收到 barrier 的 Source 发来的数据依然会进行处理,直到接收到该Source 发来的 barrier,这个过程称为 barrier的对齐

barrier是否对齐决定了程序实现的是 Exactly Once 还是 At Least Once:

如果不进行barrier对齐,那么这里 sum_even 在接收 Source2 的 barrier 之前,对于接收到 Source1的 数据4 ,不会进行缓存,而是直接进行计算,sum_even 的状态改为12,当接收到 Source2 的barrier,会将 sum_even 的状态 sum=12 进行持久化。如果本次Checkpoint成功,在进行下次 Checkpoint 前任务崩溃,会根据本次Checkpoint进行恢复。此时状态如下:

从这里我们就可以看出, Source1的数据4被计算了两次 。因此,Exactly Once语义下,必须进行barrier的对齐,而 At Least Once语义下 barrier 可以不对齐。

注意:barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如 reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对齐这个概念,都会实现Exactly Once语义,即使程序中配置了At Least Once 。

6处理缓存数据

当task接收到所有上游发送来的barrier,即可以认为当前task收到了本次 Checkpoint 的所有数据。之后 task 会将 barrier 继续发送给下游,然后处理缓存的数据,比如这里 sum_even 会处理 Source1 发送来的数据4 而且,在这个过程中 Source 会 继续读取数据 发送给下游,并不会中断。

7上报Checkpoint完成

当sink收到barrier后,会向JobManager上报本次Checkpoint完成。至此,本次Checkpoint结束,各阶段的状态均进行了持久化,可以用于后续的故障恢复。

flink state根据是否是有key分为如下两种

keyGroup,状态重分布

为了 dynamically scale Flink operators that use partitoned(key-value) state, 使用key group概念把多个key进行分组

AbstractKeyedStateBackend#

参考

rocksdb概念

>

以上就是关于Flink从BucketSink看checkpoint与故障恢复全部的内容,包括:Flink从BucketSink看checkpoint与故障恢复、Flink常见异常和错误信息小结、Flink | Checkpoint 机制详解等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/web/9602416.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-30
下一篇 2023-04-30

发表评论

登录后才能评论

评论列表(0条)

保存