flink配置

2020-11-05  本文已影响0人  神呐_宽恕我把

脚本

-c,--class <classname> 程序的入口(main method or getplan()).只有在jar程序的manifest中没有指定class

-m,--jobmanager <host:port> 在哪运行yarn-cluster

-C,--classpath <url> 代码路径

-p,--parallelism <parallelism> 并行度

-ynm,--yarnname <arg> 设置application的名字

-yjm,--yarnjobManagerMemory <arg> JobManager Container的内存

-ytm,--yarntaskManagerMemory <arg> TaskManager Container的内存

-s,--fromSavepoint <savepointPath> savepoint保存的地方,路径需写到chk-某个数

-yn,--yarncontainer <arg> Number of YARN container to allocate(=Number of Task Managers)

-ys,--yarnslots <arg> Number of slots per TaskManager

-yq -yD env.java.opts.taskmanager="-Dsun.stdout.encoding=utf-8"

代码中配置

//状态管理器MemoryStateBackend,FsStateBackend,RocksDBStateBackend后俩需要指定路径

env.setStateBackend(stateBackend);

//设置保存间隔,每 1000ms 开始一次 checkpoint

env.enableCheckpointing(1000);

//exactly-ance 和 at-least-once 语义选择,设置模式为精确一次 (这是默认值)

env.enableCheckpointing(10,EXACTLY_ONCE);

//checkpoint最小时间间隔

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

//Checkpoint 超时时间,Checkpoint 必须在一分钟内完成,否则就会被抛弃

env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);

//最大并行执行的检查点数量,默认是一个

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

//开启在 job 中止后仍然保留的 externalized checkpoints

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

//当checkpoint出现错误时是否关闭应用,默认是true,我们可以手动设置为false

env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

//默认的重启策略是:固定延迟无限重启

//此处设置重启策略为:出现异常重启1次,隔5秒一次

bsEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(5)));

//设置任务处理的时间,事件时间,注入时间,处理时间

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

上一篇下一篇

猜你喜欢

热点阅读