Flink checkpoint简介与报错调试方法汇总

Flink checkpoint简介与报错调试方法汇总,第1张

Flink checkpoint简介与报错调试方法汇总

本文主要参考官方社区给出的checkpoint出错类型和种类,以及查找报错的方法。

flink checkpint出错类型

主要分为两种
Checkpoint Decline 与 Checkpint Expire 两种类型 下面分开讨论

Checkpint 流程简介
    第一步,Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint;。第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint。第三步,当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。第四步,下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上,然后 Flink 框架会从中选择没有上传的文件进行持久化备份。同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。task收到上游全部的barrier后,会把barrier向下继续传递,并异步将自己的状态写如到持久化存储中,完成后给jm中的 Checkpoint coordinator 通知已经完成,并将备份数据的地址(state handle)也给过去。Checkpoint coordinator收集全后,会将Checkpoint meta写入到持久化存储中,完。

总结一下 checkpoint分为一下几个 *** 作:

    JM trigger checkpointSource 收到 trigger checkpoint 的 PRC,自己开始做 snapshot,并往下游发送 barrier下游接收 barrier(需要 barrier 都到齐才会开始做 checkpoint)Task 开始同步阶段 snapshotTask 开始异步阶段 snapshot
    Task snapshot 完成,汇报给 JM
    以上任何一个 *** 作失败都会导致checkpoint失败
Checkpoint 异常情况排查



以上几个参数分别是:

    一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack表示该 operator 的所有 subtask 最后 ack 的时间表 示 整 个 operator 的 所 有 subtask 中 完 成 snapshot 的最长时间表示当前 Checkpoint 的 state 大小,增量就是增量的大小

从上图可以知道第4个task *** 作导致整体的checkpoint非常慢,可以根据UI给出物理执行图来有根据的检查任务,但是大部分情况当发现checkpoint报错时,任务已经down掉,那么就需要根据yarn上的日志来具体分析

Checkpoint Decline:

从jm的日志中可以看到

Decline checkpoint 10000 by task ********* container_e119_1640332468237_165586_01_000002 @ hostname01 with allocation id 2872ccdf76d6af3baf9064be9d46fcaa

可以去 container_e119_1640332468237_165586_01_000002 所在的 tm 也就是hostname01 ,可以查看具体的tm日志查看具体的报错信息

Checkpoint Decline 中有一种情况 Checkpoint Cancel,这是由于 较小的 barrier还没有对齐,就已经收到了更大的 barrier,这种情况下就会把小的 checkpoint给取消的掉
在jm.log中会有 当前chk-11还在对齐阶段,但收到了 chk-12的barrier ,所以取消了 chk-11

Received checkpoint barrier for checkpoint  ****** before completing current checkpoint   ** Skipping current checkpoint

下游task收到被取消的barrier时会打印

$taskNameWithSubTaskAndID: Checkpoint chk-11 canceled, aborting alignment.
或
$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint chk-12 before completing current checkpoint chk-11. Skipping current checkpoint
Checkpoint Expire:

上面的Decline 比较少见,更常见的是 Expire 的情况。其中最主要的原因就是因为 checkpoint 做的非常慢,导致超时等各种情况。
出现expire时,jm.log中会有

Checkpoint 157 of job ba02728367ae85bca4d43ab7445251f5 expired before completing.
以及
Received late message for now expired checkpoint attempt 158 from task d11aac4d0b6f4fd9bde0fa4e76240c71 of job ba02728367ae85bca4d43ab7445251f5 at container_e119_1640332468237_165586_01_000002 @ cp-hadoop-hdp-node07 (dataPort=11460).

其中tm具体日志可以参考上述的办法来找到对应的报错日志。
chk很慢的情况主要有一下几种:

Source Trigger 慢

这个一般发生较少,但是也有可能,因为 source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁(这个现在社区正在进行用 mailBox 的方式替代当前抢锁的方式,详情参考[1])。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 taskmanager.log 中找不到开始做 Checkpoint 的 log,则可以考虑是否属于这种情况,可以通过 jstack 进行进一步确认锁的持有情况

State 非常大

这种情况使用增量checkpoint,现在增量checkpoint只支持RocksDBStateBackend 并需要设置开启

数据倾斜或有反压的情况

数据倾斜可以重新设计主键以及数据处理流程来改善,反压可以参考flink UI来查看哪里反压 ,并使用Metrics 来获取关键指标

反压问题处理:

定位节点,加Metrics
我们在监控反压时会用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有关,最为有用的是以下几个:
Metrics: Metris描述
outPoolUsage发送端 Buffer 的使用率
inPoolUsage接收端 Buffer 的使用率
floatingBuffersUsage(1.9 以上)接收端 Floating Buffer 的使用率
exclusiveBuffersUsage (1.9 以上)接收端 Exclusive Buffer 的使用率

barrier对齐的慢

Checkpoint 在 task 端分为 barrier 对齐(收齐所有上游发送过来的 barrier),然后开始同步阶段,再做异步阶段。如果 barrier 一直对不齐的话,就不会开始做 snapshot

这种情况也会导致 State非常大,当先到的barrier到达后,晚的barrier来之前,这之间的数据也会放入到State中一起保存起来。

在Debug日志下,barrier对齐后会有

Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

如果一直没有,注意! 是Debug日志,可以使用 at least once,来观察哪个barrier没有到达,多说一嘴,at least once 与 exectly once 最主要的语义区别就是 ,先到的barrier,是否等后到的barrier对齐才做checkpoint

Received barrier for checkpoint 96508 from channel 5
线程太忙

在 task 端,所有的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,如果主线程在处理太慢(比如使用 RocksDBBackend,state *** 作慢导致整体处理慢),导致 barrier 处理的慢,也会影响整体 Checkpoint 的进度,可能会出现barrier一直对不齐的情况
可以用AsyncProfile生成一份火焰图,查看占用cpu最多的栈,大数据集群中,如果实时离线使用一套集群,凌晨时,离线任务集体调度,就有可能导致node节点上线程不够,无法完成checkpoint导致报错

同步阶段慢

非 RocksDBBackend 我们可以考虑查看是否开启了异步 snapshot,如果开启了异步 snapshot 还是慢,需要看整个 JVM 在干嘛,也可以使用前一节中的工具。

对于 RocksDBBackend 来说,我们可以用 iostate 查看磁盘的压力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的时间总共开销多少

异步阶段慢

这一步主要是,jm将Checkpoint meta写入到持久化存储,

非 RocksDB-Backend ,主要是网络流量的问题,可以使用metirc来监控检查问题

RocksDB 来说,则需要从本地读取文件,写入到远程的持久化存储上,会涉及磁盘IO的瓶颈,如果感觉IO足够,网络也没问题,可以开启多线程上传的功能

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5700853.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存