在复杂的流处理场景中都需要记录状态,然后在新流入数据的基础上不断更新状态。
一、什么是状态流式计算分为无状态计算和有状态计算两种情况。
- 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。
- 有状态的计算会基于多个事件输出结果。
- 去重:对数据流中的重复数据进行去重;
- 检测:如判断一个温度传感器数据流中的温度是否在持续上升;
- 聚合:对一个事件窗口的数据进行聚合分析,比如分析一个小时内水位的情况;
- 更新机器学习模型:在线机器学习场景下,需要根据新流入数据不断更新模型参数;
根据作用域的不同,状态可以分为两类:算子状态(operator state)、键值分区状态(keyed state)。
从具体使用场景来说,绝大多数的算子都可以通过集成Rich函数类或其它提供好的接口类。
算子状态可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
注意:算子子任务之间的状态不能互相访问。
Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。
Flink为算子状态提供三种基本数据结构:
-
列表状态(List state):将状态表示为一组数据的列表。示例:ListState
-
联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。
-
广播状态(Broadcast state):是一种特殊的算子状态。如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。示例:BrodcastState
键控状态是根据输入数据流中定义的键(key)来维护和访问的。
Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。
Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
键值分区状态支持的数据类型:
-
ValueState
:保存单个值。每个有key有一个状态值,设置使用update(T), 获取使用T value();示例:C_ValueState -
ListState
:保存元素列表。示例:C_ListState - 添加元素:add(T)、addAll(List
) - 获取元素:Iterable
get() - 覆盖所有元素:update(List
)
- 添加元素:add(T)、addAll(List
-
ReducingState
:存储单个值。与ReducingState类似,都是进行聚合。不同的是,AggregatingState的聚合的结果和元素类型可以不一样。示例:C_ReducingState -
AggregatingState
: 存储单个值。与ReducingState类似, 都是进行聚合。不同的是, AggregatingState的聚合的结果和元素类型可以不一样。 示例:C_AggregatingState -
MapState
: 存储键值对列表。示例:C_MapState - 添加键值对: put(UK, UV) or putAll(Map
) - 根据key获取值: get(UK)
- 获取所有: entries(), keys() and values()
- 检测是否为空: isEmpty()
- 添加键值对: put(UK, UV) or putAll(Map
五、状态后端注意:
- 所有的类型都有clear(), 清空当前key的状态
- 这些状态对象仅用于用户与状态进行交互
- 状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
- 从状态获取的值与输入元素的key相关
每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)。状态后端主要负责两件事:
- 本地的状态管理
- 将检查点(checkpoint)状态写入远程存储
状态后端作为一个可插入的组件,没有固定的配置,因此可以根据需要选择一个合适的状态后端。
Flink提供了3中状态后端:
1.1 MemoryStateBackend内存级别的状态后端。
存储方式:本地状态存储在JobManager的内存中, checkpoint 存储在JobManager的内存中
特点: 快速, 低延迟, 但不稳定
使用场景:
- 本地测试
- 几乎无状态的作业(ETL)
- JobManager不容易挂, 或者挂了影响不大
- 不推荐在生产环境下使用
存储方式: 本地状态在JobManager内存, Checkpoint存储在文件系统中
特点: 拥有内存级别的本地访问速度, 和更好的容错保证
使用场景:
- 常规使用状态的作业。例如分钟级别窗口聚合, join等
- 需要开启HA的作业
- 可以应用在生产环境中
将所有的状态序列化之后, 存入本地的RocksDB数据库中(一种NoSql数据库, KV形式存储)。
存储方式:
- 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘)
- Checkpoint在外部文件系统中
使用场景:
- 超大状态的作业, 例如天级的窗口聚合
- 需要开启HA的作业
- 对读写状态性能要求不高的作业
- 可以使用在生产环境
在flink-conf.yaml文件中设置默认的全局后端
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the #2.2 代码中配置. # # state.backend: filesystem
代码中单独为Job设置状态后端。
env.setStateBackend(new MemoryStateBackend()); env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoints/fs"));
如果要使用RocksDBBackend,需要先引入依赖:
org.apache.flink flink-statebackend-rocksdb_${scala.binary.version}${flink.version} provided
env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)