【深入浅出flink】第16篇:一篇文章搞懂flink中的状态、状态类型、状态缩放(rescale)和Key Group、状态后端(State backend)

【深入浅出flink】第16篇:一篇文章搞懂flink中的状态、状态类型、状态缩放(rescale)和Key Group、状态后端(State backend),第1张

【深入浅出flink】第16篇:一篇文章搞懂flink中的状态、状态类型、状态缩放(rescale)和Key Group、状态后端(State backend)

大家好,我是雷恩Layne,这是《深入浅出flink》系列的第十六篇文章,希望能对您有所收获O(∩_∩)O


文章目录

一、什么是State二、Keyed State和Operator State

2.1 Managed State和Raw State2.2 Keyed State2.3 Operator State2.4 两者区别 三、状态缩放(rescale)四、Key Group的原理五、常见状态使用方法

5.1 KeyedState之ValueState5.2 KeyedState之ListState5.3 KeyedState之MapState5.4 KeyedState之ReducingState5.5 KeyedState之AggregatingState5.6 OperatorState之ListState5.7 OperatorState之BroadCastState 六、状态后端State backend

6.1 MemoryStateBackend6.2 FSStateBackend6.3 RocksDBStateBackend6.4 StateBackend配置方式6.5 StateBackend持久化策略

一、什么是State

我们知道,Flink的一个算子可能会有多个子任务,每个子任务可能分布在不同的实例(即slot)上,我们可以把Flink的状态理解为某个算子的子任务在其当前实例上的一个变量,该变量记录了流过当前实例算子的历史记录产生的结果。当新数据记录流入时,我们需要结合该结果(即状态,State)来进行计算。

实际上,Flink的状态是由算子的子任务来创建和管理的。一个状态的更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内流入的某个整数字段进行求和,那么当算子子任务接收到新元素时,会获取已经存储在状态中的数值(历史记录的求和结果),然后将当前输入加到状态上,并将状态数据更新。

为了保证流式计算的高可用性(容错),子任务的状态除了会暂存在节点内,还需要进行持久化存储(快照),这就是所谓的Checkpoint。当子任务出现故障或重启任务时,可以从持久化的Checkpoint中恢复。

二、Keyed State和Operator State 2.1 Managed State和Raw State

按照状态的管理方式来分,Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。从名称中也能读出两者的区别:Managed State是由Flink直接管理的,由Flink帮忙存储、恢复和优化;Raw State是开发者自己管理的,需要自己序列化。

两者的具体区别有:

从状态管理的方式上来说,Managed State由Flink Runtime托管,状态是自动存储、自动恢复的,Flink在存储管理和持久化上做了一些优化。当我们横向伸缩(即状态缩放),或者说我们修改Flink应用的并行度时,状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。从状态的数据结构上来说,Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。

用表格表示如下:

Managed StateRaw State状态管理方式Flink Runtime托管,状态是自动存储、自动恢复、自动伸缩用户自己管理状态数据结构Flink提供的常用数据结构,如ListState、MapState等字节数组: byte[]使用场景绝大多数Flink算子用户自定义算子

实际上,在绝大多数场景下我们都不需要自行维护状态,所以这里只介绍托管状态。对Managed State继续细分,又可以分为两种类型:Keyed State和Operator State。

2.2 Keyed State

我们首先来看Keyed State。我们知道,env.addSource()方法返回的是一个类型为DataStream的数据流,而这个数据流再按照数据记录中的某个关键字段(比如id字段)为Key进行了keyBy分组 *** 作,得到就是一个类型为KeyedStream的数据流。Keyed State就是这个KeyedStream上的状态。数据流中所有相同id值的的记录共享一个状态(比如数据记录求和的值),可以访问和更新这个状态。以此类推,每个Key对应一个自己的状态。下图展示了Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务1处理了两种Key,两种Key分别对应自己的状态。

Keyed State支持的数据结构如下:

值状态(Value state):将状态表示为单个的值列表状态(List state):将状态表示为一组数据的列表映射状态(Map state):将状态表示为一组 Key-Value 对聚合状态(Reducing state & Aggregating State):将状态表示为一个用于聚合 *** 作的元素

聚合状态(Reducing state & Aggregating State)内部是通过ReduceFunction和AggregateFunction进行聚合的

2.3 Operator State

介绍完Keyed State,我们再来看Operator State。顾名思义,Operator State就是算子上的状态,每个算子子任务管理自己的Operator State。虽然理论上它可以用在所有算子上,但在实际应用中它常常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。每个算子的子任务或者说每个算子实例共享同一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图展示了Operator State,算子子任务1上的所有数据可以共享第一个Operator State,以此类推,每个算子子任务上的数据共享自己的状态。

Operator State支持的数据结构如下:

列表状态(List state):将状态表示为一组数据的列表。联合列表状态(Union list state):也将状态表示为数据的列表,它与常规列表状态的区别在于,状态缩放时状态该如何分配。ListState是将整个状态列表按照round-ribon的模式均匀分布到各个算子子任务上,而Union list state按照广播的模式,将所有状态合并,再分发给每个实例的子任务上。广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。广播状态是固定维护在堆内存中的,不会写入文件系统或者RocksDB。广播流一侧修改广播状态的键值之后,数据流一侧就可以立即感知到变化。在开发过程中,如果遇到下发/广播配置、规则等低吞吐事件流到下游所有task时,就可以使用Broadcast state的特性。 2.4 两者区别

Keyed State和Operator State的区别如下:

Keyed StateOperator State适用算子类型只适用于KeyedStream上的算子可以用于所有算子状态分配每个Key对应一个状态一个算子子任务对应一个状态创建和访问方式重写对应的算子Rich Function,通过里面的RuntimeContext访问实现CheckpointedFunction等接口状态缩放状态随着Key自动在多个算子任务上迁移有多种状态重新分配方式支持的数据结构ValueState、ListState、MapState、Reducing state、Aggregating StateList state、Union list state、Broadcast state 三、状态缩放(rescale)

状态缩放(rescale),即状态的横向扩展问题。该问题主要是指因为一些业务原因,需要修改Flink作业的并行度(比如,发现某个运行中的作业的某个算子的耗时较长,影响了整体的计算速度,需要重新调整该算子的并行度,以提升作业的整体处理速度;又比如,发现某个运行的作业的资源利用率不高,可以减少一些算子的并行度)。对于Flink而言,当某个算子的并行实例数或算子的子任务数发生了变化,应用需要关停或新启动一些算子子任务,某些原来在某个算子子任务上的状态数据需要平滑地更新到新的算子子任务上。

如下图所示,Flink的Checkpoint机制,为状态数据在各算子间迁移提供了保障。Flink定期将分布式节点上的状态数据生成快照(SNAPSHOT),并保存到分布式存储(如rocksDb或hdfs)上。横向伸缩后,算子子任务的个数发生变化,子任务重启,相应的状态从分布式存储上重建即可。

以扩容为例,上图将算子B和C进行了扩容(并行度从2调整到了3)。算子的扩缩容涉及到状态的重新分配。显然,Keyed State和Operator State重新分配机制是不一样的。相对来说,Operator State的重新分配更为简单,有两种常见的状态分配方式:一种是均匀分配(即List state的方式),另一种是将所有状态合并(即Union list state的方式),再分发给每个实例上。下面以Source接入kafka消息为例,先介绍Operator State的重新分配机制。假如接入消息的topic的分区数为5,且Source一开始的并行度为1,扩容后的并行度为2,则扩容前后Operator State的重新分配结果如下图(缩容为反向过程):

我们接着来看Keyed State的重新分配。按照最简单的思路考虑,Flink中的key是按照hash(key) % parallelism的规则分配到各个Sub-Task上去的,那么我们可以在缩放完成后,根据新分配的key集合从hdfs直接取回对应的Keyed State数据。下图示出并行度从3增加到4后,Keyed State中各个key的重新分配过程。

在Checkpoint发生时,状态数据是顺序写入文件系统的。但从上图可以看出,从状态恢复时是随机读的(而不是顺序读),效率非常低下。并且缩放之后各SubTask处理的key有可能大多都不是缩放之前的那些key,无形中降低了本地性。为了解决这两个问题,在Flink-3755对Keyed State专门引入了Key Group,下面具体看看。

四、Key Group的原理

以下引自Flink官方文档:

Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.

翻译一下,Key Group是Keyed State分配的原子单位,且Flink作业内Key Group的数量与最大并行度相同,也就是说Key Group的索引位于[0, maxParallelism-1]的区间内。每个Sub-Task都会处理一个到多个Key Group,在源码中,以KeyGroupRange这一数据结构来表示。即KeyGroupRange实际上是多个连续的Key Group组成的闭区间([startKeyGroup, endKeyGroup])。

我们还有两个问题需要解决:

如何决定一个key该分配到哪个Key Group中?如何决定一个Sub-Task该处理哪些Key Group(即对应的KeyGroupRange)?

对于第一个问题,Flink实际上是对原始的key进行两重哈希(一次取hashCode,一次做MurmurHash)之后,再对最大并行度取余,得到Key Group的索引。源码如下:

public static int assignToKeyGroup(Object key, int maxParallelism) {
    return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}

public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
    return MathUtils.murmurHash(keyHash) % maxParallelism;
}

而对于第二个问题,由源码可知,SubTask处理哪些Key Group是由并行度、最大并行度和算子实例(即SubTask)的ID共同决定的。源码如下:

public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
    int maxParallelism,
    int parallelism,
    int operatorIndex) {

    checkParallelismPreconditions(parallelism);
    checkParallelismPreconditions(maxParallelism);

    Preconditions.checkArgument(maxParallelism >= parallelism,
                                "Maximum parallelism must not be smaller than parallelism.");

    int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
    int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
    return new KeyGroupRange(start, end);
}

简单来说就是,Flink会将[0, maxParallelism-1]的区间内的Key Group尽可能均匀地、连续地分给各SubTask。按照这样的Key Group分配逻辑,上一节中Keyed State重分配的场景就会变成下图所示(设最大并行度为10)。

很明显,将Key Group作为Keyed State的基本分配单元之后,上文所述本地性差和随机读的问题都部分得到了解决。当然还要注意,最大并行度对Key Group分配的影响是显而易见的,因此不要随意修改最大并行度的值。Flink内部确定默认最大并行度的逻辑如下代码所示:

public static int computeDefaultMaxParallelism(int operatorParallelism) {
    checkParallelismPreconditions(operatorParallelism);
    return Math.min(
        Math.max(
            MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
            DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
        UPPER_BOUND_MAX_PARALLELISM);
}

其中,下限值DEFAULT_LOWER_BOUND_MAX_PARALLELISM为128,上限值UPPER_BOUND_MAX_PARALLELISM为32768。

看到了这里你可能有一些疑问,我刚看到这些内容时也有不少疑问,大致如下:

    Flink作业内Key Group的数量与最大并行度相同,每个Sub-Task都会处理一个到多个Key Group。这句话怎么理解?
    Key Group是Keyed State分配的原子单位,一个subTask对应一个keyGroupRange,keyGroupRange(start和end的闭区间)包含多个keyGroup。keyGroupRange的个数跟算子的并行度一样,keyGroup的个数和最大并行度一样。一个keyGroupRange中的多个keyGroup会被分配到一个subTask。

    为什么能改善随机读的问题?

    在Checkpoint发生时,状态数据是顺序写入文件系统的。如果采用之前的方法,从状态恢复时是随机读的(而不是顺序读),效率非常低下。而keyGroupRange是多个连续的Key Group组成的闭区间([startKeyGroup, endKeyGroup]),所以获取数据时是顺序读。

    为什么能改善本地读?
    先来说下为什么之前的方法降低了本地读?我们知道,每个子任务在将状态保存到checkpoint时(比如保存到hdfs),肯定遵循本地性元素,即第一个副本优先保存到本地结点,然后再保存其它副本时才会选择远程结点。所以,虽然状态保存在hdfs中,但是子任务的实例和状态还是在一个结点中的。由于缩放之后并行度发生了改变,如果通过hash(key) % parallelism的方式获取相应的状态,很有可能大多都不是缩放之前的那些key,无形中降低了本地性。而现在hash的计算方式变成MathUtils.murmurHash(keyHash) % maxParallelism,不再依赖于算子的parallelism,而是依赖于更稳定的maxParallelism。另外一个subTask获取的是keyGroupRange所有的连续的Key Group,所以很有可能是从本地获取的,从而改善了本地读。(个人见解)

    需要注意的是,改善本地读的前提一定是程序在不重启的情况下动态改变并行度,如果整个程序重启,那么新生成的subTask很可能和之前的不一样,就没有本地性而言了,反转都是从hdfs获取的。但是,随机读还是能得到保障的。

五、常见状态使用方法

由于实际环境中使用最多的是Keyed State,所以这里先介绍Keyed State的使用方法。Flink提供了几种现成的数据结构供我们使用,他们的继承关系如下图所示。首先,State主要有三种实现,分别为ValueState、MapState和AppendingState,AppendingState又可以细分为ListState、ReducingState和AggregatingState。

5.1 KeyedState之ValueState

ValueState[T]是单一变量的状态,T是某种具体的数据类型,比如Double、String,或我们自己定义的复杂数据结构。我们可以使用value()方法获取状态,使用update(value: T)更新状态。

需求:当接收到的相同 key 的元素个数等于 3 个,就计算这些元素的 value 的平均值。

(1)继承算子的RichFunction,创建状态并编写业务逻辑。

public class CountWindowAverageWithValueState
        extends RichFlatMapFunction, Tuple2> {
    // 用以保存每个 key 出现的次数,以及这个 key 对应的 value 的总值
    //1. ValueState 保存的是对应的一个 key 的一个状态值
    private ValueState> countAndSum;
    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        ValueStateDescriptor> descriptor =
                new ValueStateDescriptor>(
                        "average",  // 状态的名字
                        Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型,防止类型擦除
        countAndSum = getRuntimeContext().getState(descriptor);
    }
    @Override
    public void flatMap(Tuple2 element,
                        Collector> out) throws Exception {
        // 拿到当前的 key 的状态值
        Tuple2 currentState = countAndSum.value();
        // 如果状态值还没有初始化,则初始化
        if (currentState == null) {
            currentState = Tuple2.of(0L, 0L);
        }
        // 更新状态值中的元素的个数
        currentState.f0 += 1;
        // 更新状态值中的总值
        currentState.f1 += element.f1;
        // 更新状态
        countAndSum.update(currentState);
        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        if (currentState.f0 >= 3) {
            double avg = (double)currentState.f1 / currentState.f0;
            // 输出 key 及其对应的平均值
            out.collect(Tuple2.of(element.f0, avg));
            //  清空状态值
            countAndSum.clear();
        }
    }
}

(2)Main方法

public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L,
                                5L));
        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithValueState())
                .print();
        env.execute();
    }
}

输出:

3> (1,5.0)
4> (2,3.6666666666666665)
5.2 KeyedState之ListState

ListState[T]存储了一个由T类型数据组成的列表。我们可以使用add(value: T)或addAll(values: java.util.List[T])向状态中添加元素,使用get(): java.lang.Iterable[T]获取整个列表,使用update(values: java.util.List[T])来更新列表,新的列表将替换旧的列表。

需求:当接收到的相同 key 的元素个数等于 3 个,就计算这些元素的 value 的平均值。

(1)继承算子的RichFunction,创建状态并编写业务逻辑。

public class CountWindowAverageWithListState
        extends RichFlatMapFunction, Tuple2> {
    //1. ListState 保存的是对应的一个 key 的出现的所有的元素
    private ListState> elementsByKey;
    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        ListStateDescriptor> descriptor =
                new ListStateDescriptor>(
                        "average",  // 状态的名字
                        Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型
        elementsByKey = getRuntimeContext().getListState(descriptor);
    }
    @Override
    public void flatMap(Tuple2 element,
                        Collector> out) throws Exception {
        // 拿到当前的 key 的状态值
        Iterable> currentState = elementsByKey.get();
        // 如果状态值还没有初始化,则初始化
        if (currentState == null) {
            elementsByKey.addAll(Collections.emptyList());
        }
        // 更新状态
        elementsByKey.add(element);
        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        List> allElements =
                Lists.newArrayList(elementsByKey.get());
        if (allElements.size() >= 3) {
            long count = 0;
            long sum = 0;
            for (Tuple2 ele : allElements) {
                count++;
                sum += ele.f1;
            }
            double avg = (double) sum / count;
            out.collect(Tuple2.of(element.f0, avg));
            // 清除状态
            elementsByKey.clear();
        }
    }
}

(2)Main方法

将5.1的Main方法中flatMap的Function替换为CountWindowAverageWithListState


public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L,
                                5L));
        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithListState())
                .print();
        env.execute();
    }
}
5.3 KeyedState之MapState

MapState[K, V]存储一个Key-Value map,其功能与Java的Map几乎相同。get(key: K)可以获取某个key下的value,put(key: K, value: V)可以对某个key设置value,contains(key: K)判断某个key是否存在,remove(key: K)删除某个key以及对应的value,entries(): java.lang.Iterable[java.util.Map.Entry[K, V]]返回MapState中所有的元素,iterator(): java.util.Iterator[java.util.Map.Entry[K, V]]返回一个迭代器。需要注意的是,MapState中的key和Keyed State的key不是同一个key。

需求:当接收到的相同 key 的元素个数等于 3 个,就计算这些元素的 value 的平均值。

(1)继承算子的RichFunction,创建状态并编写业务逻辑。

public class CountWindowAverageWithMapState
        extends RichFlatMapFunction, Tuple2> {
    // managed keyed state
    //1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值
    private MapState mapState;
    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        MapStateDescriptor descriptor =
                new MapStateDescriptor(
                        "average",  // 状态的名字
                        String.class, Long.class); // 状态存储的数据类型
        mapState = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public void flatMap(Tuple2 element,
                        Collector> out) throws Exception {
        mapState.put(UUID.randomUUID().toString(), element.f1);
        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        List allElements = Lists.newArrayList(mapState.values());

        if (allElements.size() >= 3) {
            long count = 0;
            long sum = 0;
            for (Long ele : allElements) {
                count++;
                sum += ele;
            }
            double avg = (double) sum / count;
            out.collect(Tuple2.of(element.f0, avg));
            // 清除状态
            mapState.clear();
        }
    }
}

(2)Main类

将5.1的Main方法中flatMap的Function替换为CountWindowAverageWithMapState

5.4 KeyedState之ReducingState

ReducingState[T]和AggregatingState[IN, OUT]与ListState[T]同属于MergingState[T]。与ListState[T]不同的是,ReducingState[T]只有一个元素,而不是一个列表。它的原理是新元素通过add(value: T)加入后,与已有的状态元素使用ReduceFunction合并为一个元素,并更新到状态里。AggregatingState[IN, OUT]与ReducingState[T]类似,也只有一个元素,只不过AggregatingState[IN, OUT]的输入和输出类型可以不一样。ReducingState[T]和AggregatingState[IN, OUT]与窗口上进行ReduceFunction和AggregateFunction很像,都是将新元素与已有元素做聚合。

需求:求接收到的相同 key 的value的sum。

(1)继承算子的RichFunction,创建状态并编写业务逻辑。

public class SumFunction
        extends RichFlatMapFunction, Tuple2> {
    // 用于保存每一个 key 对应的 value 的总值
    private ReducingState sumState;
    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        ReducingStateDescriptor descriptor =
                new ReducingStateDescriptor(
                        "sum",  // 状态的名字
                        new ReduceFunction() { // 聚合函数
                            @Override
                            public Long reduce(Long value1, Long value2) throws
                                    Exception {
                                return value1 + value2;
                            }
                        }, Long.class); // 状态存储的数据类型
        sumState = getRuntimeContext().getReducingState(descriptor);
    }
    @Override
    public void flatMap(Tuple2 element,
                        Collector> out) throws Exception {
        // 将数据放到状态中
        sumState.add(element.f1);

        out.collect(Tuple2.of(element.f0, sumState.get()));
    }
}

(2)Main类

将5.1的Main方法中flatMap的Function替换为SumFunction

输出:

4> (2,4)
3> (1,3)
3> (1,5)
4> (2,7)
3> (1,7)
5.5 KeyedState之AggregatingState

需求:求接收到的相同 key 的value显示出来。

(1)继承算子的RichFunction,创建状态并编写业务逻辑。

public class ContainsValueFunction
        extends RichFlatMapFunction, Tuple2> {
    private AggregatingState totalStr;
    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        AggregatingStateDescriptor descriptor =
                new AggregatingStateDescriptor(
                        "totalStr",  // 状态的名字
                        new AggregateFunction() {
                            @Override
                            public String createAccumulator() {
                                return "Contains:";
                            }
                            @Override
                            public String add(Long value, String accumulator) {
                                if ("Contains:".equals(accumulator)) {
                                    return accumulator + value;
                                }
                                return accumulator + " and " + value;
                            }
                            @Override
                            public String getResult(String accumulator) {
                                return accumulator;
                            }
                            @Override
                            public String merge(String a, String b) {
                                //return a + " and " + b;
                                return null;
                            }
                        }, String.class); // 状态存储的数据类型
        totalStr = getRuntimeContext().getAggregatingState(descriptor);
    }
    @Override
    public void flatMap(Tuple2 element,
                        Collector> out) throws Exception {
        totalStr.add(element.f1);
        out.collect(Tuple2.of(element.f0, totalStr.get()));
    }
}

(2)Main方法

将5.1的Main方法中flatMap的Function替换为SumFunction。

输出:

3> (1,Contains:3)
4> (2,Contains:4)
4> (2,Contains:4 and 2)
4> (2,Contains:4 and 2 and 5)
3> (1,Contains:3 and 5)
3> (1,Contains:3 and 5 and 7)
5.6 OperatorState之ListState

状态从本质上来说,是Flink算子子任务的一种本地数据,为了保证数据可恢复性,使用Checkpoint机制来将状态数据持久化输出到存储空间上。状态相关的主要逻辑有两项:

一、将算子子任务本地内存数据在Checkpoint时snapshot写入存储;二、初始化或重启应用时,以一定的逻辑从存储中读出并变为算子子任务的本地内存数据。

Keyed State对这两项内容做了更完善的封装,开发者可以开箱即用。对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态,可以访问和修改该状态。Flink的算子子任务上的数据在程序重启、横向伸缩等场景下不能保证百分百的一致性。换句话说,重启Flink应用后,某个数据流元素不一定会和上次一样,还能流入该算子子任务上。因此,我们需要根据自己的业务场景来设计snapshot和restore的逻辑。为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction接口类。

public interface CheckpointedFunction {
  
  // Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化
  void snapshotState(FunctionSnapshotContext context) throws Exception;
  // 初始化时会调用这个方法,向本地状态中填充数据
  void initializeState(FunctionInitializationContext context) throws Exception;
}

在Flink的Checkpoint机制下,当一次snapshot触发后,snapshotState会被调用,将本地状态持久化到存储空间上。这里我们可以先不用关心snapshot是如何被触发的,暂时理解成snapshot是自动触发的,后续文章会介绍Flink的Checkpoint机制。

initializeState在算子子任务初始化时被调用,初始化包括两种场景:

一、整个Flink作业第一次执行,状态数据被初始化为一个默认值;二、Flink作业重启,之前的作业已经将状态输出到存储,通过这个方法将存储上的状态读出并填充到这个本地状态中。

目前Operator State主要有三种,其中ListState和UnionListState在数据结构上都是一种ListState,还有一种BroadcastState。这里我们主要介绍ListState这种列表形式的状态。这种状态以一个列表的形式序列化并存储,以适应横向扩展时状态重分布的问题。每个算子子任务有零到多个状态S,组成一个列表ListState[S]。各个算子子任务将自己状态列表的snapshot到存储,整个状态逻辑上可以理解成是将这些列表连接到一起,组成了一个包含所有状态的大列表。当作业重启或横向扩展时,我们需要将这个包含所有状态的列表重新分布到各个算子子任务上。

ListState和UnionListState的区别在于:

ListState是将整个状态列表按照round-ribon的模式均匀分布到各个算子子任务上,每个算子子任务得到的是整个列表的子集;UnionListState按照广播的模式,将整个列表发送给每个算子子任务。

Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。这里我们来看一个Flink官方提供的Sink案例以了解CheckpointedFunction的工作原理。

需求: 每两条数据打印一次结果 1000

(1)实现SinkFunction和CheckpointedFunction

public class CustomSink
        implements SinkFunction>, CheckpointedFunction {

    // 用于缓存结果数据的
    private List> bufferElements;
    // 表示内存中数据的大小阈值
    private int threshold;
    // 用于保存内存中的状态信息
    private ListState> checkpointState;
    // StateBackend
    // checkpoint
    public CustomSink(int threshold) {
        this.threshold = threshold;
        this.bufferElements = new ArrayList<>();
    }
    
     Sink的核心处理逻辑,将上游数据value输出到外部系统
    @Override
    public void invoke(Tuple2 value, Context context) throws
            Exception {
        // 可以将接收到的每一条数据保存到任何的存储系统中
        bufferElements.add(value);
        if (bufferElements.size() == threshold) {
            // send it to the sink
            // 这里简单打印
            System.out.println("自定义格式:" + bufferElements);
            // 清空本地缓存
            bufferElements.clear();
        }
    }
    
  // 重写CheckpointedFunction中的snapshotState
  // 将本地缓存snapshot保存到存储上
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception
    {
        // 将之前的Checkpoint清理
        checkpointState.clear();
        // 将最新的数据写到状态中
        for (Tuple2 ele : bufferElements) {
            checkpointState.add(ele);
        }
    }
    
  // 重写CheckpointedFunction中的initializeState
  // 初始化状态:用于在程序恢复的时候从状态中恢复数据到内存
    @Override
    public void initializeState(FunctionInitializationContext context) throws
            Exception {
        // 注册ListStateDescriptor
        ListStateDescriptor> descriptor =
                new ListStateDescriptor>(
                        "bufferd -elements",
                        TypeInformation.of(new TypeHint>
                                () {}));
        // 从FunctionInitializationContext中获取OperatorStateStore,进而获取ListState
        checkpointState =
                context.getOperatorStateStore().getListState(descriptor);
        // 如果是作业重启,读取存储中的状态数据并填充到本地缓存中
        if (context.isRestored()) {
            for (Tuple2 ele : checkpointState.get()) { 
                bufferElements.add(ele);
            }
        }
    }
}

(2)Main方法

public class TestOperatorStateMain {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource> dataStreamSource =
                env.fromElements(Tuple2.of("Spark", 3), Tuple2.of("Hadoop", 5),
                        Tuple2.of("Hadoop", 7),
                        Tuple2.of("Spark", 4));

        dataStreamSource
                .addSink(new CustomSink(2)).setParallelism(1);
        env.execute("TestStatefulApi");
    }
}

输出:

自定义格式:[(Spark,3), (Hadoop,5)]
自定义格式:[(Hadoop,7), (Spark,4)]

上面的代码在输出到Sink之前,先将数据放在本地缓存中,并定期进行snapshot,这实现了批量输出的功能,批量输出能够减少网络等开销。同时,程序能够保证数据一定会输出外部系统,因为即使程序崩溃,状态中存储着还未输出的数据,下次启动后还会将这些未输出数据读取到内存,继续输出到外部系统。

注册和使用Operator State的代码和Keyed State相似,也是先注册一个StateDescriptor,并指定状态名字和数据类型,然后从FunctionInitializationContext中获取OperatorStateStore,进而获取ListState。如果是UnionListState,那么代码改为:context.getOperatorStateStore.getUnionListState。

状态的初始化逻辑中,我们用context.isRestored来判断是否为作业重启,这样可以从之前的Checkpoint中恢复并写到本地缓存中。

5.7 OperatorState之BroadCastState

广播状态是固定维护在堆内存中的,不会写入文件系统或者RocksDB。

下面我们通过BroadCastState控制程序的打印输出为例进行介绍。

(1)定义普通数据流,消费数据

DataStreamSource dataStreamSource = env.socketTextStream("localhost", 9999);

(2)定义广播流,用于广播规则,从而控制程序打印输出

DataStreamSource broadStreamSource = env.socketTextStream("localhost", 8888);

(3)解析广播流中的数据,解析为二元组

DataStream> broadStream =
    broadStreamSource.map(new MapFunction>() {
        @Override
        public Tuple2 map(String s) throws Exception {
            String[] strings = s.split(" ");
            return Tuple2.of(strings[0], (strings[1]));
        }
    });

(4)定义需要广播的状态类型,只支持

MapStateDescriptor descriptor = new
    MapStateDescriptor(
    "ControlStream",
    String.class,
    String.class
);

(5)用解析后的广播流将状态广播出去,从而生成BroadcastStream

BroadcastStream> broadcastStream = broadStream.broadcast(descriptor);

(6)通过connect连接两个流,用process分别处理两个流中的数据。连接流时分为两种情况:

noKeyedStream.connect(BroadcastStream).process(new BroadcastProcessFunction<>(…)): 非 KeyedStream 连接 BroadcastStream 的,只能使用 BroadcastProcessFunction 函数处理连接逻辑KeyedStream.connect(BroadcastStream).process(new KeyedBroadcastProcessFunction<>(…)):KeyedStream 连接 BroadcastStream 的,只能使用 KeyedBroadcastProcessFunction 函数处理连接逻辑

KeyedBroadcastProcessFunction 比 BroadcastProcessFunction 多了计时器服务和获取当前 key 接口,当然,这两个功能不一定能用到。

我们这里使用的是 BroadcastProcessFunction,这三个泛型翻译分别代表:

 IN1:数据流(即非广播流)的元素类型
 IN2:广播流的元素类型
 OUT:两个流连接完成后,输出流的元素类型。

BroadcastProcessFunction中定义了两个函数用于处理具体的连接逻辑和业务逻辑。因此主要需要实现以下两个函数:

public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector out) throws Exception;

这里处理广播流的数据,将广播流数据保存到 BroadcastState 中。value 是广播流中的一个元素;ctx 是上下文,提供 BroadcastState 和修改方法;out 是输出流收集器。

public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector out) throws Exception;

这个函数处理数据流的数据,这里之只能获取到 ReadOnlyBroadcastState,因为 Flink 不允许在这里修改 BroadcastState 的状态。value 是数据流中的一个元素;ctx 是上下文,可以提供上下文环境和只读的 BroadcastState;out 是输出流收集器。

注意:KeyedBroadcastProcessFunction中的ReadOnlyContext多了计时器服务和获取当前 key 接口

下面是完整的代码。

需求:通过BroadCastState控制程序的打印输出

public class TestBroadcastState {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 1. 定义普通数据流,消费数据
        DataStreamSource dataStreamSource =
                env.socketTextStream("localhost", 9999);
        // 2. 定义广播流,用于广播规则,从而控制程序打印输出
        DataStreamSource broadStreamSource =
                env.socketTextStream("localhost", 8888);
        // 3. 解析广播流中的数据成二元组
        DataStream> broadStream =
                broadStreamSource.map(new MapFunction>() {
                    @Override
                    public Tuple2 map(String s) throws Exception {
                        String[] strings = s.split(" ");
                        return Tuple2.of(strings[0], (strings[1]));
                    }
                });
        //4. 定义需要广播的状态类型,只支持MapState
        MapStateDescriptor descriptor = new
                MapStateDescriptor(
                "ControlStream",
                String.class,
                String.class
        );
        //5. 用解析后的广播流将状态广播出去,从而生成BroadcastStream
        BroadcastStream> broadcastStream =
                broadStream.broadcast(descriptor);
        //6. 通过connect连接两个流,用process分别处理两个流中的数据
        dataStreamSource
                .connect(broadcastStream)
                .process(new KeyWordsCheckProcessor())
                .print();
        env.execute();
    }

    private static class KeyWordsCheckProcessor
            extends BroadcastProcessFunction,
            String> {
        MapStateDescriptor descriptor =
                new MapStateDescriptor(
                        "ControlStream",
                        String.class,
                        String.class
                );
        @Override
        public void processBroadcastElement(Tuple2 value,
                                            Context ctx, Collector out)
                throws Exception {
            // 将接收到的控制数据放到 broadcast state 中
            ctx.getBroadcastState(descriptor).put(value.f0, value.f1);
            //打印控制信息
            System.out.println(Thread.currentThread().getName() + " 接收到控制信息 :" + value);
        }
        @Override
        public void processElement(String value,
                                   ReadOnlyContext ctx, Collector out)
                throws Exception {
            // 从 broadcast state 中拿到控制信息
            String keywords = ctx.getBroadcastState(descriptor).get("key");
            //获取符合条件的单词
            if (value.contains(keywords)) {
                out.collect(value);
            }
        }
    }
}
六、状态后端State backend

Flink的状态是由算子的子任务来创建和管理的,每传入一条数据,子任务都会读取和更新状态,子任务的状态除了会暂存在节点内,还需要进行持久化存储(快照),也就是所谓的Checkpoint,当子任务出现故障或重启任务时,可以从持久化的Checkpoint中恢复。

也就是说,状态有两部分:一部分是本地的状态,检查点(checkpoint)中的状态。状态是存储在状态后端(State backend)的,它专门负责状态的存储、访问以及维护,主要做两件事:

Local State Management(本地状态管理)Remote State Checkpointing(远程状态备份)

Flink提供了三种类型的状态后端,分别是基于内存的状态后端MemoryStateBackend( 默认的state的类型就是这种)、基于文件系统的状态后端FsStateBackend以及基于RockDB作为存储介质的RocksDB StateBackend。这三种类型的StateBackend都能够有效地存储Flink流式计算过程中产生的状态数据,在默认情况下Flink使用的是MemoryStateBackend,区别见下表。下面分别对每种状态后端的特点进行说明。

6.1 MemoryStateBackend

MemoryStateBackend,运行时所需的 State 数据全部保存在 TaskManager JVM堆上内存中,执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中。

MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。

默认情况下,每一个状态最大为 5 MB。可以通过 MemoryStateBackend 的构造函数增加最大大小。状态的总大小不能超过TaskManager的内存。

特点:快速、低延迟缺点:状态在内存中可能会丢失,只能保存数据量小的状态用于:开发测试 6.2 FSStateBackend

FSStateBackend,运行时所需的 State 数据全部保存在 TaskManager 的内存中(状态的总大小不能超过TaskManager的内存,默认5M), 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中。

可以是分布式或者本地文件系统,路径如:

HDFS 路径:hdfs://namenode:40010/flink/checkpoints本地路径:file://data/flink/checkpoints

默认情况下,FsStateBackend 会配置提供异步快照,以避免在写状态 checkpoint 时阻塞数据流的处理。该特性可以通过在实例化 FsStateBackend 时将布尔标志设置为 false 来禁用,例如:

new FsStateBackend(path, false);

缺点:状态大小受TaskManager内存限制(默认支持5M)优点:状态访问速度很快;状态信息不会丢失用于:因为状态信息不会丢失,所以生成环境下可用 6.3 RocksDBStateBackend

RocksDBStateBackend,运行时所需的 State 数据保存在RocksDB 数据库(key-value 的数据存储服务),不会受限于 TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中。RocksDB 克服了 State 受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

RocksDBStateBackend 的配置同样需要文件系统的 URL(类型,地址,路径)等来配置,如hdfs://namenode:40010/flink/checkpoints。

缺点:状态访问速度有所下降优点:可以存储超大量的状态信息;状态信息不会丢失用于:生产,可以存储超大量的状态信息(受限于磁盘可用空间的大小)

RocksDBStateBackend 是目前唯一支持有状态流处理应用程序增量检查点的状态后端。

6.4 StateBackend配置方式

(1)单任务调整

修改当前任务代码
env.setStateBackend(new
FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】

(2)全局调整(不建议)

修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend),
filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

(3)其它高级配置

// 高级选项
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000L); //超时时间
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //现在同时进行的checkpoint barrier,设置为1就表示只有前一个checkpoint保存完了,才能进行下一个checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);// 最小暂停时间:前一个checkpoint barrier保存完后,至少要留出100ms才能进行下一个checkpoint barrier的保存,这样做是为了留出一定的时间处理数据,否则可能使得某个任务一直在保存checkpoint ,就不能处理数据了
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //设置为true,更倾向于用检查点做恢复,而不是用最近的point(这个最近的point可能是checkpoint,也可能是savepoint),设置为false就表示使用最近的point来恢复
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); //容忍checkpoint失败的次数,设置为0就表示不能容忍checkpoint失败,也就是说只要checkpointB保存失败,任务就挂了,重新恢复后才能执行

// 3. 重启策略配置

// 固定延迟重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
// 失败率重启
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));

6.5 StateBackend持久化策略

Flink 的状态最终都要持久化到第三方存储中,确保集群故障或者作业挂掉后能够恢复。RocksDBStateBackend 持久化策略有两种:

全量持久化策略:RocksFullSnapshotStrategy增量持久化策略:RocksIncementalSnapshotStrategy

全量持久化策略每次将全量的 State 写入到状态存储中(HDFS),上述三种状态后端都支持这种策略。快照保存策略类体系在执行持久化策略的时候,可以使用异步机制,每个算子启动 1 个独立的线程,将自身的状态写入分布式存储可靠存储中。

增量持久化策略就是每次持久化增量的 State,只有 RocksDBStateBackend 支持增量持久化。Flink 增量式的检查点以 RocksDB 为基础的,具体就不展开介绍了。

参考资料(文章第一至四章主要整理自参考资料的1~5)

    https://mp.weixin.qq.com/s/JLl-LMjcnVrIyHCCq7Yv7Ahttps://mp.weixin.qq.com/s/twA5HiVJbTGwVpn-uiVx2ghttps://zhuanlan.zhihu.com/p/104171679https://blog.csdn.net/nazeniwaresakini/article/details/104220138https://mp.weixin.qq.com/s/ggHmSc86mN3I7r6snjqxWQhttps://mp.weixin.qq.com/s/ZVLIuekZQt7hQ8XND6NUSQhttps://blog.csdn.net/u013411339/article/details/112934975

本文仅供学术交流使用,加上自己的思考、实践和摘录,整理出本文,若有部分章节侵权,请联系博主删除。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5720009.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存