在前面的 Flink学习笔记(十一):flink KeyedState运用介绍了如何使用state进行sum *** 作。但是数据流通常是长时间运行,那么存在的状态将越来越多,如何解决这个问题呢?
1、Flink State Time-To-Live (TTL)Flink提供了StateTtlConfig机制进行处理。首先我们看下提供的策略类型:
- TTL 刷新策略(默认OnCreateAndWrite)
- 状态可见性(默认NeverReturnExpired)
具体可以参考flink的官方文档
里面有更具体的介绍,包括state类型,清理策略和相关例子
2、实例还是上面文章中的一个例子
我们可以看到在keybystream中配置了StateTtlConfig,配置方式如下,当一个状态超过两秒后重新计算状态
StateTtlConfig ttlConfig = StateTtlConfig newBuilder(Time.seconds(2)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("key-fruit", Types.TUPLE(Types.STRING, Types.INT)); stateDescriptor.enableTimeToLive(ttlConfig);
当然清除状态可以使用cleanupIncrementally,如
StateTtlConfig ttlConfig = StateTtlConfig newBuilder(Time.seconds(2)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupIncrementally(10, true) .build();
我们看下完整代码
public class TestStateTtlConfig { private static final String[] FRUIT = {"苹果", "梨", "西瓜", "葡萄", "火龙果", "橘子", "桃子", "香蕉"}; public static void main(String args[]) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream> fruit = env.addSource(new SourceFunction >() { private volatile boolean isRunning = true; private final Random random = new Random(); @Override public void run(SourceContext > ctx) throws Exception { while (isRunning) { TimeUnit.SECONDS.sleep(1); ctx.collect(Tuple2.of(FRUIT[random.nextInt(FRUIT.length)], 1)); } } @Override public void cancel() { isRunning = false; } }); fruit.keyBy(0).map(new RichMapFunction , Tuple2 >() { private ValueState > valueState; @Override public void open(Configuration parameters) throws Exception { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(2)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupIncrementally(10, true) .build(); ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("key-fruit", Types.TUPLE(Types.STRING, Types.INT)); stateDescriptor.enableTimeToLive(ttlConfig); valueState = getRuntimeContext().getState(stateDescriptor); } @Override public Tuple2 map(Tuple2 tuple2) throws Exception { Tuple2 currentState = valueState.value(); // 初始化 ValueState 值 if (null == currentState) { currentState = new Tuple2<>(tuple2.f0, 0); } Tuple2 newState = new Tuple2<>(currentState.f0, currentState.f1 + tuple2.f1); // 更新 ValueState 值 valueState.update(newState); return Tuple2.of(newState.f0, newState.f1); } }).print(); env.execute("fruit"); }
执行结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)