Flink:从业务实践角度聊聊状态缓存和内存管理

Flink:从业务实践角度聊聊状态缓存和内存管理,第1张

Flink:从业务实践角度聊聊状态缓存和内存管理

本文就是记录些实践结论,不会做深入原理和源码级说明,因为这些,不如去看源码和官方文档,至少官方文档介绍的非常详细,比某些博文缺胳膊少腿、粘贴复制强多了,据我所知,市场上很多关于Flink的参考书大多直接翻译的官方文档

官方文档链接附上:https://www.bookstack.cn/read/flink-1.11.1-zh/collapse-2

如上是中文翻译版的文档,也有英文版选择和不同版本选择,自行选用

本文基于Flink V1.11

文章目录

Flink状态缓存State

Flink状态缓存Flink状态缓存的过期机制Flink状态后端Flink状态查询与监控 Flink内存管理

Flink状态缓存State

先简单聊聊什么是状态,flink为啥要设计它,日常开发中用的多不多,怎么用,有哪些注意事项

Flink状态缓存

状态缓存:在flink的世界里,叫做state。顾名思义,他就是flink计算过程中用来存储中间计算结果或存放临时数据的一种缓存。流式计算,数据持续输入并持续计算,那么中间的计算结果,临时数据等都可以放在状态缓存里

设计它的目的:一是计算过程中的状态缓存;二是容错机制。

有2类:keyedState;OperatorState;

KeyedState:每个key自有的缓存。在keyBy之后,用途最广最频繁

ValueState:单值

ListState:列表

MapState:Map

ReduceState:让状态做自动聚合的数据状态,聚合的数据类型必须一致

AggregatingState:让状态做自动聚合的数据状态,聚合的数据类型可以不一致,更灵活

OperatorState:每个算子单个子任务自有的缓存:例如算子有4个并行度,那么会有4个算子实例,对应4个OperatorState,并且这4个OperatorState间不能互相访问

1.ListState 和UnionListState 一般业务开发很少用,更多会用在Source、Sink处,offset保存或sink缓冲等

2.BroadcastState 对于一些配置、规则或者小量的维表数据可以使用,全量下发同步给下游算子,可以避免多余的外部访问

ListState 一个列表UnionListState 也是一个列表,与ListState 有啥区别呢?使用时没啥区别,只有在重分布(重新划分并行度)、容错从保存点恢复数据时有区别,如下图将ListState 、UnionListState 、BroadcastState在重新划分并行度或从保存点恢复时的区别。BroadcastState:广播状态,就是把数据广播给下游所有算子。(广播啥意思:一个喇叭,所有人都能听到喇叭广播的一摸一样的信息),作用在不同算子间的数据分发。

原始状态和托管状态

Raw State:原始状态,用户自己管理,状态数据结构为byte[],一般就是Managed State不能满足使用了才会在自定义算子中使用,用的少。Managed State:托管状态(记住这个名词,在内存管理篇章中有涉及),就是上文描述的那些,托管的意思就是托付给Flink管理我们的状态

小结一下:状态缓存在流计算中用的非常多,其中DataStreamAPI可以通过API灵活的 *** 作状态数据,而TableAPI和SQL是无法主动 *** 作状态缓存的,都是Flink自动解析语句后进行 *** 作。所以会引发一些问题:状态过大OOM;状态恢复如何恢复等等

Flink状态缓存的过期机制

前面有说,状态,只是临时中间数据,是辅助计算用的,那一定需要管理它的有效时间

状态缓存的使用,一定要设计好过期机制,否则可能会造成OOM和性能低下。状态数据的过期,是跟业务相关的,比如我计算近5分钟的UV,那么状态数据实际上不在这个时间窗口内的数据都没用。

DataStreamAPI配置状态过期,通过StateTtlConfig配置,具体参看官方文档,有很详细,每个参数项都有介绍:https://www.bookstack.cn/read/flink-1.11.1-zh/ca935bea1d323f2d.md

重点关注下清理策略,根据不同状态后端和业务需求考量

(1)增量式处理可以动态的持续的释放内存,包括本地内存,但一定程度上会占用一点CPU

(2)全量式可以减少整个状态大小,但是不会清理本地状态,只有容错恢复时会放弃已经过期的状态,考量本地内存(TM内存)大小

(3)RocksDB的压缩清理策略要注意配置压缩处理条数间隔,默认1000条,间隔越小JNI开销越大,但清理越及时,但会影响整体性能。所以,结合内存管理机制(看官方文档或关注我相关flink内存文章),给的托管内存够大,磁盘性能OK,那可以尽可能的加大压缩条数间隔

示例-具体配置方式和详细说明见官方文档(看到的大部分博客都是从官方文档copy-paste来的)

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TableAPI和SQL需要配置minimum idle state retention time和maximum idle state retention time。非活动状态数据的最小和最长保留时间,两值相差必须大于5分钟,避免一小段时间内频繁进行失效状态数据内存回收影响性能,实际上并不是到了最小或最大时间就立马回收,它是内部有逻辑尽可能的在最小最大之间集中回收尽可能多的失效状态数据,避免到时间点就回收频繁影响性能。(有好多博客对这俩配置都做了源码级的解释,如果一定要源码级的话,建议最好自己去看源码,否则理解功能和设计思路就行)实践中,只要是存在非时间段的开窗,都要配置这俩参数,最小值的大小根据业务设置,最大值一般都给minimum + 5分钟

注意,这俩配置只对TableAPI&SQL的状态有效,DataStreamAPI的有单独配置

job代码中

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), Time.seconds(360));

或sql-client-defaults.yml文件

execution:
  # minimum idle state retention in ms
  min-idle-state-retention: 60000
  # maximum idle state retention in ms
  max-idle-state-retention: 360000
Flink状态后端

状态后端StateBackend,说的就是状态我持久化在哪里。为啥要持久化,因为容错机制,我要从状态恢复我的中间计算结果,然后接着数据流继续计算,如果不能恢复中间计算结果,要么重新消费数据计算,要么就丢数据或者重复计算数据了,这里先暂时不深入聊容错机制,后面开文章好好聊,里面内容很多。

官方文档位置:https://www.bookstack.cn/read/flink-1.11.1-zh/718f588f9609ada3.md

有三种状态后端

MemoryStateBackend:内存状态后端,也就是状态存放于运行时内存中,仅测试时使用。重点:数据存放在TM中,而后统一存放在JM中,故测试时也不要超过TM和JM的内存大小

FsStateBackend:文件系统状态后端,也就是状态存放在文件系统中。注意:文件系统可以是任意文件系统,例如本地文件系统和HDFS等。

运行时存放在TM中,少量元数据信息在JM中,所以要确保不会超过TM内存大小,实践中可以调managed memory设为0,为啥可以设置为0,后文内存管理有详细介绍,就是用不到了。适用于状态较大、窗口较长的任务,但是不能超过TM的内存大小,即受限于单个TM的内存大小。在需要容错机制时可用此状态后端

RocksDBStateBackend:将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。

checkpoint保存状态时,仍然需要把RocksDB本地数据存放到文件系统中比如HDFS等。少量元数据信息在JM中与FsStateBackend不同的是,FsStateBackend运行时数据在TM内存,而RocksDBStateBackend运行时数据在本地嵌入的RocksDB数据库中,所以它的状态数据不需要占用TM内存,但是要占用Managed memory内存,但是只是Managed内存消耗而不是全量的状态大小内存使用。调大Managed memory可有效提升job与RocksDB交互性能key和value数值大小有限制,尤其在ListState时要注意,单key单value不能超过 2^31 字节适用于非常大状态的job和容错场景,受限于本地磁盘大小支持增量checkpoint由于存在本地磁盘数据结构读写 *** 作,相比FsStateBackend状态后端性能要差很多,但实际上生产上使用的很多

小结下,实际生产上FsStateBackend和RocksDBStateBackend都有使用,若是有大状态大窗口又要求性能的任务,那么建议使用RocksDBStateBackend+固态盘。一般建设flink流计算平台都会使用RocksDBStateBackend的状态后端

代码

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(...);

或者配置文件flink-conf.yaml

配置参数官网说明:https://www.bookstack.cn/read/flink-1.11.1-zh/32b496b3c89e8f90.md#546nak

如下只是示例

# 用于存储和检查点状态rocksdb、filesystem
state.backend: filesystem
# 存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Default target directory for savepoints, optional.
# savepoints的默认目标目录(可选)
state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
# 用于启用/禁止增量checkpoints的标志(只有rocksdb状态后端支持)
state.backend.incremental: false
Flink状态查询与监控

job一旦运行起来,状态数值是否可以监控呢,理论上肯定可以,flink仍然在完善这部分接口,在1.11版本中也描述了不能保证现有接口的稳定性,后续版本的query state API会持续更新,所以一定要找到对应版本文档去看详细内容

https://www.bookstack.cn/read/flink-1.11.1-zh/59f9506bbddc90a3.md

这里就聊下能做什么吧,也有部分我们生产上已经在 *** 作的

flink的TM上有个QueryableStateServer运行着供外部进行query状态官方文档有详细使用流程说明和示例,我就不复制粘贴了托管的状态缓存数据结构都可以通过名称进行查询开启状态查询功能会占用TM部分资源可以定时查询,把数据回传给自定义的监控系统未来flink会完善支持通过 Metrics 系统发布状态部分监控和QueryableStateServer的访问监控 Flink内存管理

内存管理也不说太多细节,很多东西开发者文档都有描述,我贴个文档位置

https://www.bookstack.cn/read/flink-1.11.1-zh/collapse-195

几个小结

对象的有效信息被序列化成二进制流,在内存中连续存储,保存在预分配的内存块上,MemorySegment对象的序列化方式,Flink有专门的序列化类型,抽象是InformationType,对于程序的int、long等等有对应的序列化类型定义,具体看文档吧:https://www.bookstack.cn/read/flink-1.11.1-zh/380c958b72879671.mdMemorySegment是内存分配的最小单元,默认32k,提供高效的读写方法,很多计算可以直接 *** 作二进制数据不需要反序列化对于部分数据肯定会超过MemorySegment大小然后存放在多个MemorySegment上,所以Flink又抽象了一层数据访问视图:内存页。方便对MemorySegment的数据访问,分别有DataInputView和DataOutPutView抽象来对数据进行读写,当然这些都是底层的数据结构,业务访问层无需关心堆外内存和堆内内存:堆外内存扩展无限,但是内存分配开销大,部分逻辑堆外内存比堆内慢;堆内内存有JVM限制,但是内存分配快

关于Slots数量

1.slots:slot目的是隔离内存空间,但共享CPU和堆外内存。

2.slots数量配置要考虑内存和CPU核,划分过多slot可能单slot内存太小不够用导致OOM,且超过CPU核数时并不会增加整个任务的吞吐量

3.slots数量决定整个任务的并行度

这里就聊聊TM内存吧,JM内存,流计算调的很少,消耗的也不多,默认值跑着没什么问题,特殊业务情况除外

关于TM内存:TM内存是生产上调节的重点

如下从官方文档搬过来的

组成部分配置参数描述框架堆内存(framework Heap Memory)taskmanager.memory.framework.heap.size用于 Flink 框架的 JVM 堆内存(进阶配置)。任务堆内存(Task Heap Memory)taskmanager.memory.task.heap.size用于 Flink 应用的算子及用户代码的 JVM 堆内存。托管内存(Managed memory)taskmanager.memory.managed.size taskmanager.memory.managed.fraction由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。框架堆外内存(framework Off-heap Memory)taskmanager.memory.framework.off-heap.size用于 Flink 框架的堆外内存(直接内存或本地内存)(进阶配置)。任务堆外内存(Task Off-heap Memory)taskmanager.memory.task.off-heap.size用于 Flink 应用的算计及用户代码的堆外内存(直接内存或本地内存)。网络内存(Network Memory)taskmanager.memory.network.min taskmanager.memory.network.max taskmanager.memory.network.fraction用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存的受限的等比内存部分。JVM metaspacetaskmanager.memory.jvm-metaspace.sizeFlink JVM 进程的 metaspace。JVM 开销taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。

一般flink框架堆内堆外内存不好评估,一般不会去调节,而且和任务内存没有完全隔离,个人实践中没有去动它(文档是说后续还在持续优化此部分,可以关注新版本内存管理相关部分)

JVMmetaspace和JVM开销这一般也不会去主动调整

重点关注的是任务堆内存、任务堆外内存、托管内存、网络内存,这四个占用着TM所在机器的内存大部分,少部分给到flink框架堆内堆外内存和JVM元空间以及JVM开销。

任务堆内存:开窗计算的流数据和自定义算子内对象数据等开销,是算子运行时计算使用,尤其使用文件系统的状态后端或本地运行测试时消耗更大,因为还有状态数据的占用。使用rocksDB状态后端时则可以去掉状态数据的占用

任务堆外内存:没有特殊业务,没有超大对象(超大对象直接进入老年代,会有FullGC),一般不配置任务堆外内存,默认也是0。因为一个是看你TM节点所在机器资源,现在一般K8s管理,单节点4G,你的任务内堆内存和托管内存和网络缓冲都分不到多少;还一个,如果没有GC压力,也就是超大对象这种,使用堆内存会更快性能更好

堆外内存不好管理,OOM问题难以排查,Flink针对堆外内存的使用有自有管理优化,更多是应用在托管内存和网络缓冲区上,注意托管内存和网络缓冲内存属于堆外内存,具体可以看上图

托管内存(Managed Memory):流计算里给到RocksDB状态后端使用,用在数据读写和缓存上,所以托管内存的占比影响着RocksDB读写性能,默认是0.4。如果不是用的RocksDB,此内存可以给很小比例。批计算里给到数据聚合计算、排序等等。

网络缓冲区(Network Buff):了解过NIO就比较好理解此部分。网络缓冲区使用的是堆外内存的直接内存部分,默认占总内存的0.1比例,最小64M,最大1G。它会提前预分配好Memory Segment,所以你去webUI看到直接内存已使用完了,但网络缓冲区内存片段剩余多少可以看到。此部分内存是供算子上下游数据缓冲用的,相当于上下游算子间的内存队列,下游缓冲队列满了就不接收了,然后告诉上游,从而进行反压。网络缓冲区内存配的多大多小跟下游计算能力有关,下游越快,缓冲区可以稍微大一点,可以让下游算子更大吞吐量;但是如果下游计算很慢,你给再大缓冲区队列,也只会积压在队列中,白白浪费缓冲区资源。

如果你精细化配置这些个内存占比或大小,那就不要配置进程总内存或任务总内存,会产生配置冲突

官方文档调优指南(如上叙述也参考了):https://www.bookstack.cn/read/flink-1.11.1-zh/64ab98d8bb816306.md

本文聊了状态缓存和内存管理,后面会继续聊聊容错机制细节和业务开发需要注意的地方,感谢!

以上,个人愚见,如有叙述问题请及时指出,避免误导,万分感谢

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5701643.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存