flink版本:flink1.13.1
codeimport org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.TimeUnit; public class StateBackendTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1.设置状态后台 env.setStateBackend(new MemoryStateBackend()); // 本地内存状态后台 //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints")); //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); // 2. 设置检查点 每秒执行1次checkPoint env.enableCheckpointing(1000); // checkPoint 参数优化 // checkPoint 模式,精确一次 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // checkPoint超时时间,60s env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 设置两次检查点的最小的时间间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //设置是否优先使用检查点恢复机制。默认为false,即checkpoint和savepoint之间采用就近原则,设为true,则优先使用checkpoint env.getCheckpointConfig().setPreferCheckpointForRecovery(false); // 设置可容忍的检查点失败数量,默认是0,即不允许任何checkpoint检查点失败,如果checkpoint失败则任务失败,直接重启 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); // 3. 设置重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,60000L)); env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); env.execute("StateBackend"); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)