Flink Checkpoint配置

2019-12-19  本文已影响0人  OzanShareing

范例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);

flink-conf.yaml相关配置:

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false

小结:

Java 配置实例:

/**
 * 是否重启标识flag
 */
private static boolean replayFlag = true;

/**
 * 重启次数
 */
private static Integer replayTimes;

/**
 * 重启时间间隔
 */
private static Integer replaySeconds;

private static Long checkPointTime;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

if (replayFlag) {
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            replayTimes,
            Time.of(replaySeconds, TimeUnit.SECONDS)
    ));
    CheckpointConfig config = env.getCheckpointConfig();

    //env.setStateBackend(new FsStateBackend(checkPointDir));
    // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
    config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    // 设置checkpoint的周期, 每隔1000 ms进行启动一个检查点
    config.setCheckpointInterval(checkPointTime);
    
    // 设置模式为exactly-once
    config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
    config.setMinPauseBetweenCheckpoints(500);
    
    // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
    config.setCheckpointTimeout(checkPointTime);
    
    // 同一时间只允许进行一个检查点
    config.setMaxConcurrentCheckpoints(1);
}
上一篇 下一篇

猜你喜欢

热点阅读