State类型
Keyed State
ValueStateListStateReducingStateAggregatingState
State类型 Keyed State字面意思,这是一个作用在keyedStream上的state。 所以,首先要对Stream进行 keyBy() 通过下图, 我们可以知道,keydState 包含以下五种。 1. ValueStateValueState2. ListState 3. ReducingState 4. AggregatingState 5. MapState
保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。ListState
保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(ListReducingState) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List ) 覆盖当前的列表。
保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。AggregatingState
保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。MapState
维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(MapDemo) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
public class CountWindowAverage extends RichFlatMapFunctionOperator State, Tuple2 > { private transient ValueState > sum; @Override public void flatMap(Tuple2 input, Collector > out) throws Exception { // access the state value Tuple2 currentSum = sum.value(); // update the count currentSum.f0 += 1; // add the second field of the input value currentSum.f1 += input.f1; // update the state sum.update(currentSum); // if the count reaches 2, emit the average and clear the state if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor > descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint >() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(value -> value.f0) .flatMap(new CountWindowAverage()) .print(); // the printed output will be (1,4) and (1,5)
Broadcast State
Broadcast State is a special type of Operator State. It was introduced to support use cases where records of one stream need to be broadcasted to all downstream tasks, where they are used to maintain the same state among all subtasks. This state can then be accessed while processing records of a second stream. As an example where broadcast state can emerge as a natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest of operator states in that:
public BroadcastStreambroadcast( final MapStateDescriptor, ?>... broadcastStateDescriptors) { Preconditions.checkNotNull(broadcastStateDescriptors); final DataStream broadcastStream = setConnectionType(new BroadcastPartitioner<>()); return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors); } public MapStateDescriptor(String name, Class keyClass, Class valueClass) { super(name, new MapTypeInfo<>(keyClass, valueClass), null); } public MapStateDescriptor( String name, TypeInformation keyTypeInfo, TypeInformation valueTypeInfo) { super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null); } final DataStreamSource dimSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source"); //广播 mysql dim final BroadcastStream dimBc = dimSource.broadcast(new MapStateDescriptor ("dim_broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)