Apache Flink学习

Apache Flink学习,第1张

Apache Flink学习

在复杂的流处理场景中都需要记录状态,然后在新流入数据的基础上不断更新状态。

一、什么是状态

流式计算分为无状态计算和有状态计算两种情况。

  • 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。
  • 有状态的计算会基于多个事件输出结果。
二、状态使用场景
  1. 去重:对数据流中的重复数据进行去重;
  2. 检测:如判断一个温度传感器数据流中的温度是否在持续上升;
  3. 聚合:对一个事件窗口的数据进行聚合分析,比如分析一个小时内水位的情况;
  4. 更新机器学习模型:在线机器学习场景下,需要根据新流入数据不断更新模型参数;
三、Flink中状态的分类

根据作用域的不同,状态可以分为两类:算子状态(operator state)、键值分区状态(keyed state)。

从具体使用场景来说,绝大多数的算子都可以通过集成Rich函数类或其它提供好的接口类。

Operator StateKeyed State使用算子类型所有算子:常用于source只适用于KeyedStream上的算子状态分配一个算子的子任务对应一个状态一个Key对应一个State:一个算子会处理多个Key,则访问相应的多个State创建和访问方式实现CheckpointFunction接口重写RichFunction,通过里面的RuntimeContext访问横向扩展并发改变时有多重重写分配方式可选:均匀分配和合并后每个得到全量并发改变,State随着Key在实例间迁移支持的数据结构ListState、BroadCastStateValueState、ListState、MapState、ReduceState、AggregationState 四、状态的使用 1、算子状态使用

算子状态可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。

注意:算子子任务之间的状态不能互相访问。

Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。

Flink为算子状态提供三种基本数据结构:

  • 列表状态(List state):将状态表示为一组数据的列表。示例:ListState

  • 联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

    一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。

  • 广播状态(Broadcast state):是一种特殊的算子状态。如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。示例:BrodcastState

2、键值分区状态使用

键控状态是根据输入数据流中定义的键(key)来维护和访问的。

Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。

Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。

键值分区状态支持的数据类型:

  • ValueState:保存单个值。每个有key有一个状态值,设置使用update(T), 获取使用T value();示例:C_ValueState

  • ListState:保存元素列表。示例:C_ListState

    • 添加元素:add(T)、addAll(List)
    • 获取元素:Iterable get()
    • 覆盖所有元素:update(List)
  • ReducingState:存储单个值。与ReducingState类似,都是进行聚合。不同的是,AggregatingState的聚合的结果和元素类型可以不一样。示例:C_ReducingState

  • AggregatingState: 存储单个值。与ReducingState类似, 都是进行聚合。不同的是, AggregatingState的聚合的结果和元素类型可以不一样。 示例:C_AggregatingState

  • MapState: 存储键值对列表。示例:C_MapState

    • 添加键值对: put(UK, UV) or putAll(Map)
    • 根据key获取值: get(UK)
    • 获取所有: entries(), keys() and values()
    • 检测是否为空: isEmpty()

注意:

  • 所有的类型都有clear(), 清空当前key的状态
  • 这些状态对象仅用于用户与状态进行交互
  • 状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
  • 从状态获取的值与输入元素的key相关
五、状态后端

每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。

状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)。状态后端主要负责两件事:

  • 本地的状态管理
  • 将检查点(checkpoint)状态写入远程存储
1、状态后端分类

状态后端作为一个可插入的组件,没有固定的配置,因此可以根据需要选择一个合适的状态后端。

Flink提供了3中状态后端:

1.1 MemoryStateBackend

内存级别的状态后端。

存储方式:本地状态存储在JobManager的内存中, checkpoint 存储在JobManager的内存中

特点: 快速, 低延迟, 但不稳定

使用场景:

  1. 本地测试
  2. 几乎无状态的作业(ETL)
  3. JobManager不容易挂, 或者挂了影响不大
  4. 不推荐在生产环境下使用
1.2 FsStateBackend

存储方式: 本地状态在JobManager内存, Checkpoint存储在文件系统中

特点: 拥有内存级别的本地访问速度, 和更好的容错保证

使用场景:

  1. 常规使用状态的作业。例如分钟级别窗口聚合, join等
  2. 需要开启HA的作业
  3. 可以应用在生产环境中
1.3 RocksDBStateBackend

将所有的状态序列化之后, 存入本地的RocksDB数据库中(一种NoSql数据库, KV形式存储)。

存储方式:

  1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘)
  2. Checkpoint在外部文件系统中

使用场景:

  1. 超大状态的作业, 例如天级的窗口聚合
  2. 需要开启HA的作业
  3. 对读写状态性能要求不高的作业
  4. 可以使用在生产环境
2、配置状态后端 2.1 全局配置

在flink-conf.yaml文件中设置默认的全局后端

# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#
# state.backend: filesystem
2.2 代码中配置

代码中单独为Job设置状态后端。

env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoints/fs"));

如果要使用RocksDBBackend,需要先引入依赖:


    org.apache.flink
    flink-statebackend-rocksdb_${scala.binary.version}
    ${flink.version}
    provided

env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5574223.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存