Flink State

Flink State,第1张

Flink State

目录

State类型

Keyed State

ValueStateListStateReducingStateAggregatingState

State类型 Keyed State
字面意思,这是一个作用在keyedStream上的state。
所以,首先要对Stream进行 keyBy()
通过下图, 我们可以知道,keydState 包含以下五种。
1. ValueState
2. ListState
3. ReducingState
4. AggregatingState
5. MapState

ValueState
保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
ListState
保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。
ReducingState
保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
AggregatingState
保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
MapState
维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
Demo
public class CountWindowAverage extends RichFlatMapFunction, Tuple2> {

    
    private transient ValueState> sum;

    @Override
    public void flatMap(Tuple2 input, Collector> out) throws Exception {

        // access the state value
        Tuple2 currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

Operator State



Broadcast State

Broadcast State is a special type of Operator State. It was introduced to support use cases where records of one stream need to be broadcasted to all downstream tasks, where they are used to maintain the same state among all subtasks. This state can then be accessed while processing records of a second stream. As an example where broadcast state can emerge as a natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest of operator states in that:

    public BroadcastStream broadcast(
            final MapStateDescriptor... broadcastStateDescriptors) {
        Preconditions.checkNotNull(broadcastStateDescriptors);
        final DataStream broadcastStream = setConnectionType(new BroadcastPartitioner<>());
        return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
    }


    public MapStateDescriptor(String name, Class keyClass, Class valueClass) {
        super(name, new MapTypeInfo<>(keyClass, valueClass), null);
    }

    public MapStateDescriptor(
            String name, TypeInformation keyTypeInfo, TypeInformation valueTypeInfo) {
        super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
    }


        final DataStreamSource dimSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");
        //广播 mysql dim
        final BroadcastStream dimBc = dimSource.broadcast(new MapStateDescriptor("dim_broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705930.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存