Flink checkPoint容错机制配置

Flink checkPoint容错机制配置,第1张

Flink checkPoint容错机制配置 Flink checkPoint容错机制配置

flink版本:flink1.13.1

code
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

public class StateBackendTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //  1.设置状态后台
        env.setStateBackend(new MemoryStateBackend()); // 本地内存状态后台
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
        //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));

        // 2. 设置检查点 每秒执行1次checkPoint
        env.enableCheckpointing(1000);

        // checkPoint 参数优化
        // checkPoint 模式,精确一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkPoint超时时间,60s
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 设置两次检查点的最小的时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        //设置是否优先使用检查点恢复机制。默认为false,即checkpoint和savepoint之间采用就近原则,设为true,则优先使用checkpoint
        env.getCheckpointConfig().setPreferCheckpointForRecovery(false);
        // 设置可容忍的检查点失败数量,默认是0,即不允许任何checkpoint检查点失败,如果checkpoint失败则任务失败,直接重启
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);

        // 3. 设置重启策略
        
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,60000L));

        
        env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));


        env.execute("StateBackend");
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5590506.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存