- 一、FlinkState的概念
- 1、state分类
- 2、state backend 类型
- 二、实际应用如下
- 1、应用场景介绍
- 2、FsStateBackend 和 RocksDBStateBackend 相关配置
- 1)FsStateBackend 的flink-conf.yaml 配置
- 2)RocksDBStateBackend 的flink-conf.yaml 配置
- 3、FsStateBackend 和 RocksDBStateBackend 只是state存储方式的不同,flink代码实现方式无区别,本文采用的mapstate。
- 3、redis、FsStateBackend 和 RocksDBStateBackend测试对比如下(kafka使用的机械硬盘性能较差,限制了处理速度以下对比基于相同环境下做出)
- 1)redis
- 2) state in FsStateBackend
- 3) state in RocksDBStateBackend
flink有两种基本类型的state
operator state
keyed state
每种类型的state都可以以两种形式存在
1)原生状态(raw state)
- 由算子自己管理数据结构,当触发checkpoint *** 作过程中,flink并不知道数据内部的数据结构,只是将数据转换成bytes数据存储在checkpoint中,当checkpoint恢复任务时,算子自己再反序列化出状态的数据结构
2)托管状态(managed state)
- 由Flink Runtime控制和管理状态数据,并将状态数据转换成为内存的Hash tables或 RocksDB的对象存储,然后将这些数据通过内部的接口持久化到checkpoints中,任务异常时可以通过这些状态数据恢复任务。
- 推荐使用ManagedState管理状态数据,ManagedState更好的
以下对目前广泛使用的三类 state backend 做了区分,其中绿色表示所创建的operator/keyed state backend 是 on-heap 的,黄色则表示是 off-heap 的。
一般而言,在生产中,我们会在 FsStateBackend 和 RocksDBStateBackend 间选择:
FsStateBackend:性能更好;日常存储是在堆内存中,面临着 OOM 的风险(以下实际应用中我会举例介绍),不支持增量 checkpoint。
RocksDBStateBackend:无需担心 OOM 风险,是大部分时候的选择。
机器配置:
redis服务器 node4:8c 32g
kafka集群:node1、node3
hadoop flink集群:node1、node2、node3(16c 32g、6c 32g、16c 32g)
上送数据为变化上送,需按主键取出原上送数据进行实时更新重新下发
数据json格式,大小3kb左右,后期随着字段增多后期数据增加到6kb左右,数据量15000条/s
缓存使用:redis或者state,经测试redis,使用稳定性高,资源占用较少,可以单独构建redis集群,但是处理性能较差,且要维护另外一套组件以及集群,因此此次采用flink内置state作为缓存,redis的使用暂时先不介绍。
#开启HA,使用文件系统作为快照存储 state.backend: filesystem #启用检查点,可以将快照保存到HDFS state.backend.fs.checkpointdir: hdfs://nncluster/flink-checkpoints2)RocksDBStateBackend 的flink-conf.yaml 配置
#开启HA,使用文件系统作为快照存储 state.backend: rocksdb # # block大小,默认4KB state.backend.rocksdb.block.blocksize: 100kb # # block cache大小,默认8MB,内存余量充足建议128m或256m,提升读的性能 state.backend.rocksdb.block.cache-size: 512m state.backend.rocksdb.compaction.level.use-dynamic-size: true # # 后台负责 flush 和 compaction 的最大并发线程数,默认为1 state.backend.rocksdb.thread.num: 6 # # 指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见 state.checkpoints.dir: hdfs://nncluster/flink-checkpoints # # 用于指定定时器服务的工厂类实现类,默认为“HEAP” state.backend.rocksdb.timer-service.factory: rocksdb # # 用于指定同时可以 *** 作RocksDBStateBackend的线程数量,默认是1 state.backend.rocksdb.checkpoint.transfer.thread.num: 6 # # 配置任务本地恢复 state.backend.local-recovery: true # # 指定RocksDB存储状态数据的本地文件路径,在每个TaskManager提供该路径节点中的状态存储 state.backend.rocksdb.localdir: /home/fiot/middleinstall/flink-1.11.2/rocksDB # # 设为 false 禁用 RocksDB 内存托管 state.backend.rocksdb.memory.managed: true # # 限制每个slot的RocksDB内存的使用上限,避免了OOM的风险 state.backend.rocksdb.memory.fixed-per-slot: 500mb # # 默认值 0.5,即 50% 的给定内存会分配给写缓冲区使用 state.backend.rocksdb.memory.write-buffer-ratio: 0.9 # # 默认值 0.1,即 10% 的 block cache 内存会优先分配给索引及过滤 state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 # #state.backend.rocksdb.metrics.estimate-num-keys: true state.backend.rocksdb.metrics.num-running-compactions: true # # 监控当前的实际延迟写入率 state.backend.rocksdb.metrics.background-errors: true #启用检查点,可以将快照保存到HDFS #state.backend.fs.checkpointdir: hdfs://nncluster/flink-checkpoints
以上参数可以根据自己服务器配置进行变更
3、FsStateBackend 和 RocksDBStateBackend 只是state存储方式的不同,flink代码实现方式无区别,本文采用的mapstate。public class TopoKeyedMapFunction extends RichFlatMapFunction3、redis、FsStateBackend 和 RocksDBStateBackend测试对比如下(kafka使用的机械硬盘性能较差,限制了处理速度以下对比基于相同环境下做出){ private static final Logger LOG = LoggerFactory.getLogger(TopoKeyedMapFunction.class); private transient MapState mapState; @Override public void open(Configuration parameters) throws Exception { MapStateDescriptor stateDescription = StateUtils.getStateDescription(); mapState = getRuntimeContext().getMapState(stateDescription); } @Override public void flatMap(String value, Collector out) throws Exception { ParameterTool parameters = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); try { JSonObject jo = JSONObject.parseObject(value); TopoData newApData = parseApData(jo, parameters,mapState); String mac = newApData.getMac(); //中间结果写入mapstate mapState.remove(mac); mapState.put(mac,JSONObject.toJSonString(newApData)); //中间结果写入redis //RedisUtil.set(mac,JSONObject.toJSonString(newApData),SECONDS); out.collect(JSONObject.toJSonString(newApData)); }catch (Exception e){ LOG.error("topo-data flatmap failed--",e); } }
source 6p filter 6p flatmap 6p sink6p
处理速度计算方式:压入1000w左右数据开始消费,至消费完成计算出处理时间
此处为redis为空开始测试的,随着每条数据的增加,后期处理速度维持在3000条/秒左右
taskmanager内存配置为4g(200w条后出现oom)、6g(500w条后出现oom)
taskmanager内存为4g,无oom风险
以上为本人的简单测试,仅作参考,
1、rocksDB可以按照集群实际配置进行相应的优化处理速度会有提升
2、生产环境建议使用rocksDB,redis缓存在数据量较少场景稳定性较好,但是此场景下使用fsbackend性能较高
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)