这一篇文章写的特别的好。
https://www.jianshu.com/p/4d31d6cddc99
个人用自己的语言在捋一遍。
假设有如下一个程序:
kafka->source->keybyUser->sink(统计PV)
首先简单起见,假设只有一个并行度。
第一步是要开启 checkpoint机制,设置checkpoint的时间间隔,可以当作是某种形式的备份状态数据。
既然要备份,那么就可以选择需要备份的地方,可以是内存,也可以使外存,比如hdfs,rocketdb等。
以上面的例子为例,假设10分钟做一个checkpoint,我们看看是如何实现的。
首先,jobManager 为整个job指定一个 checkpointer coordinator管理者(cdr),有他负责整个备份流程。
cdr 每10分钟发送一个事件,叫做barrier到流数据中。
从source 开始,在我们的例子中,source 备份了什么,主要就是记住,我已经消费了的kafka 的offset,比如
记录下来(partion-1, 1000)
然后,把barrier 转发给下游的算子,下游统计pv的程序,比如此时此刻,统计到
(site1,1000), 在收到 barrier之后,就会停止目前的流计算,然后进行state备份。
备份完之后,发给下游sink,sink 看自己需求是否是无状态,决定是否需要备份。
等这三个阶段都完成之后,cdr 就会决定这个程序完成了一次checkpoint机制。
加入下一个轮回中,有相关的算子出现了异常,整个jobmanager可以从最新的checkpoint中进行恢复。
在上面的例子中,就是从kafka 最最新的offset 读取数据,然后,统计pv的算子,也可以从已有的
(site1,1000)继续统计。这就是整个checkpoint 和异常恢复的机制。
这里涉及到一点,就是在处理快照的时候,整个处理程序是要倍阻塞停顿的,比如(site1,1000)触发快照,
如果不停段,在写入state的时候可能就是(site1,1001)了,造成不准确,exactly-once无法保障。
现在让我们提高复杂度?假设在多个并行度的情况下如何做处理?
这里有一个类似与join的概念, 如果 一个 task,他的上游输入是有多个流的,那么
对于 sub1,sub2 的barrier,task 需要等待这两个barrier都到达之后做一个checkpoint 并且向下游发送barrier。
这个 *** 作在flink里面叫做barrier对齐。 先到barrier,对于后续的流数据,通常会存在缓冲里面,并不做处理。
这样做通常会影响部分性能,但是Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once;
那么exactly once 的case 我们也就大概明白。
对于多个并行度,只有做barrier对齐才能达到exactly once。
项目有临时性的特征。体现在项目有明确的开始结束时间。而一旦项目的目标达成,项目范围内的任务全部完成,亦或是,项目需求不再存在,项目就会结束。《一》项目周期
每个项目都有项目的周期,这是 项目的临时性所决定的。 一般项目周期分为四个阶段:
第一阶段:规划阶段
该阶段主要涉及甲乙双方就需求,问题或者机会的确认。一般由项目主导方提供一份需求文档,该文档包含项目目标,范围,需求可行性分析以及预期结果相关内容。
第二阶段:计划阶段
该阶段对于项目经理进行任命,招投标,并且启动项目。以第一阶段的需求文档为基础,项目经理制定计划以及预算,建立团队。
第三阶段:实施阶段
该阶段主要是按照计划进行实施以及评审。同时对于项目进度,预算,质量进行监控。
第四阶段:完成阶段
主要是项目完成后的验收,移交,相关文档的整理,经验交流与总结以及解散团队。
《二》多变的实施阶段
对于一个项目而言,都会经历以上四个阶段。而 根据项目的不同,实施阶段相对其他三个阶段,会显得比较灵活。 不同的项目,不同行业的项目,对于实施阶段会有不一样的划分。具体以项目计划为准。比如:
软件开发项目,实施阶段可以细分为需求确认,详细设计,编码,集成测试,用户反馈测试,部署等子阶段。
药品研发项目,实施阶段则可以细分为药物寻源,新药调查,一阶段临床测试,二阶段临床测试,三阶段临床测试,新药申请,登记等子阶段。
《三》三项时间规划
具体如何分解实施阶段需要根据行业特征,以及项目特性,由项目经理按照需要具体问题具体分析。但是 无论如何分解,任何阶段都需要设定三项时间的设置:
1. checkpoint 设置
指在规定的时间间隔内对项目进行检查,比较实际与计划之间的差异,并根据差异进行调整。 一般情况是每周一次,如果实际情况和计划偏差较大,需要使用鱼骨图(今后会讲到)找出问题的根本原因,并且制定解决方案加以解决。
2. milestone 设置
对应的就是各个阶段,以及子阶段在项目计划中的开始和结束时间。 是细化项目管理力度的重要手段。 对于整个项目团队而言,里程碑的设立可以明确项目中每个人的目标。保证正确的时间做正确的事情。
3. baseline 设置
指一个(或一组)配置项在项目生命周期的不同时间点上通过正式评审而进入正式受控的一种状态。基线其实是一些重要的里程碑,但 相关交付物要通过正式评审并作为后续工作的基准和出发点。基线一旦建立后变化需要受控制。
当项目进行到某一个baseline 时,检查当前项目状态是否符合该阶段的结束条件,以及是否下一阶段的开始条件。只有当满足条件是,才可以向下一个阶段迁移。
如此做法是为了避免盲目切换到下一个阶段后由于前一阶段的产出物无法满足下一阶段的要求导致的人力资源浪费。
举个例子,软件开发类项目,需要先进行设计,开发,之后才能进行测试。假设整个实施阶段采用瀑布开发的模型,需要等到开发完全完成后才能进行测试。项目计划2周设计,4周开发,4周测试于是:
1. 项目checkpoint:
每周都会作为一个checkpoint检查当前进度和计划是否有出入。
2. 项目milestone:
第一周开始为一个milestone表示开始进行设计,
第三周开始作为一个milestone表示开始开发,
第六周结束为一个milestone,表示开发任务完成。
第十周结束为一个milestone,表示测试任务完成。
3. 项目baseline:
第三周的milestone作为baseline,在此需要验证所有功能都已经设计完毕。
第六周的milestone作为baseline,在此需要验证开发工作是否完全结束。开发的任务是否能够运行起来,达到进入测试阶段的最低要求。(一般在这里会做一个烟雾测试,确保项目最低限度的能够运作)
第十周的milestone作为baseline,在此需要验证是否所有的测试用例都已经跑过并且通过。缺陷跟踪的问题是否都已经解决。
以上就是项目生命周期划分的阶段以及相关注意事项。
Checkpoint 检查点,Flink 定期把 state 缓存数据持久化保存下来的过程 。它的目的是容错和 exactly-once 语义功能。
分布式系统实现一个全局状态保留的功能。
① 传统方案使用一个统一时钟,通过 master 节点广播到每个 slaves 节点。当 slaves 接收到后,记录其状态 。缺点:单点故障、数据不一致(延迟/失败)、系统不稳定
② Flink 采用栅栏 Barrier 作为 Checkpoint 的传递信号,与业务数据一样,无差别的传递下去 。
每一个 Flink 作业都会有一个 JobManager ,JobManager 里面的 checkpoint coordinator 管理整个作业的 checkpoint 过程。用户通过 env 设置 checkpoint 的时间间隔,使得 checkpoint coordinator 定时将 checkpoint 的 barrier 发送给每个 source subtask。
当 source 算子实例收到一个 barrier 时,它会暂停自身的数据处理,然后将自己的当前 缓存数据 state 保存为快照 snapshot,并且持久化到指定的存储,最后算子实例向 checkpoint coordinator 异步发送一个确认信号 ack,同时向所有下游算子广播该 barrier 和恢复自身的数据处理。
以此类推,每个算子不断制作 snapshot 并向下游广播 barrier,直到 barrier 传递到 sink 算子实例,此时确定全局快照完成。
Flink Web UI 有 Checkpoint 监控信息,包括统计信息和每个Checkpoint的详情。如下图所示,红框里面可以看到一共触发了 569K 次 Checkpoint,然后全部都成功完成,没有 fail 的。
如下图所示,点击某次 Checkpoint “+”,可知该Checkpoint 的详情。
① Acknowledged 表示有多少个 subtask 对这个 Checkpoint 进行了 ack,从图中可知,共有3个 operator 分为2个 subtask,这2个 subtask 都完成 ack。
② Latest Acknowledgement 表示所有 subtask 的最后 ack 的时间;
③ End to End Duration 表示所有 subtask 中完成 snapshot 的最长时间;
④ State Size 表示当前 Checkpoint 的 state 大小(如果是增量 checkpoint,则表示增量大小);
⑤ Buffered During Alignment 表示在 barrier 对齐阶段累计多少数据(如果这个数据过大,则间接表示对齐比较慢);
如下图所示,Flink Web UI 的 Checkpoint 界面显示 Checkpoint 10432 失败。点击 Checkpoint 10423 的详情“+”,可知 Acknowledged、Latest Acknowledgement等信息。
查看 JobManager 的日志 jobmanager.log,其中关键日志,如下
解析: 10423 是 checkpointID, 0b60f08bf8984085b59f8d9bc74ce2e1 是 task execution id 即 subtask id, 85d268e6fbc19411185f7e4868a44178 是 job id。
从上述的 jobmanager.log 日志中,可知 subtask id 和 job id,可以确定 taskmanager 和 slot。
从上面的日志,可知 subtask 被调度到 节点 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot ,接着到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失败的具体原因。
如果较小的 Checkpoint 没有对齐的情况,Flink 收到了更大的 Checkpoint,则会把较小的 Checkpoint 给取消,其关键日志如下。
该日志表示当前 Checkpoint 19 还在对齐阶段,同时收到了 Checkpoint 20 的 barrier,接着通知到下游的 task checkpoint 19 被取消了,同时也会通知 JM 当前 Checkpoint 被 decline 掉了。
当下游 task 收到被 cancel barrier 的时候,打印如下的关键日志,表示当前 task 接收到上游发送过来的 barrier cancel 消息,从而取消了对应的 Checkpoint。
如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint 也会失败。例如,如果 Checkpoint 21 由于超时而失败是,jobmanager.log 的关键日志如下。
接着打开 debug 级别的日志, taskmananger.log 的 snapshot 分为三个阶段,开始 snapshot 前,同步阶段,异步阶段:
Checkpoint 慢的场景,例如 Checkpoint interval 1 分钟,超时 10 分钟,Checkpoint 经常需要做 9 分钟,而且实际 state size 比预期的大很多。
简单介绍 Checkpoint Barrier 对齐机制: 算子 Operator 从输入流接收到 barrier n 后,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到 barrier n 。如下图所示,Operator 从数字流中接收到 barrier n 后,接着数字流的数据不会被处理而是放入输入缓冲区。直到字母流的 barrier n 达到 Operator 后,Operator 向下游发送 barrier n 和 缓冲区的数据,同时进行自身的 snapshot。
由于 barrier 对齐机制,算子需要接收到上游全部 barrier n 后,才会进行 snapshot。如果作业存在反压或者数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间。如下图所示,通过Flink Web UI 监控 subtask 数据量 和反压 BackPressure。
介绍Checkpoint Barrier 对齐机制,算子 Operator 收齐上游的 barrier n 才能触发 snapshot。例如,StateBackend 是 RocksDB,snapshot 开始的时候保存数据到 RocksDB,然后 RocksDB 异步持久化到 FS。如果 barrier n 一直对不齐的话,就不会开始做 snapshot。
Checkpoint 有两种模式:全量 Checkpoint 和 增量 Checkpoint 。全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优势。
如果通过日志发现同步阶段比较慢,对于非 RocksDBBackend,可以考虑开启异步 snapshot。如果开启了异步 snapshot 还是慢,需要使用 AsyncProfile 查看整个JVM。
对于 RocksDBBackend,使用 iostate 查看磁盘的压力,同时查看 TaskMananger 的 RocksDB log日志,查看其中 snapshot 时间总开销。
异步阶段,TaskManager 主要将 state 备份到持久化存储 HDFS。对于非 RocksDBBackend,主要瓶颈来自于 网络 ,可以考虑观察网络的 metric,或者使用 iftop 观察对应机器上的网络流量情况。
对于 RocksDB,则需要从本地读取文件,写入到远程的持久化存储上 HDFS,所以不仅需要考虑 网络的瓶颈,还需要考虑本地磁盘的性能 。
该场景出现的概率比较小,source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 taskmanager.log 中找不到开始做 Checkpoint 的 log,则可以考虑是否属于这种情况,可以通过 jstack 进行进一步确认锁的持有情况。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)