Spark & Flinkflinkflink

Flink状态(state)管理在代码里配置checkpoint

2019-03-19  本文已影响1人  it_zzy

checkPoint简介

checkPoint配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000); 
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); 
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000); 
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint

State Backend(状态的后端存储)

State Backend使用方式

修改State Backend的两种方式

State backend演示

第一种:单任务调整

启动连接socket zzy:9001的程序

./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
[iknow@data-5-63 flink-1.7.2]$ ./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
2019-03-06 12:03:15,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-03-06 12:03:15,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-03-06 12:03:15,325 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2019-03-06 12:03:15,415 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-06 12:03:15,415 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-06 12:03:15,421 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-03-06 12:03:15,421 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-03-06 12:03:15,511 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-03-06 12:03:15,819 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/zhangzhiyong/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-06 12:03:16,386 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1551789318445_0004
2019-03-06 12:03:16,412 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1551789318445_0004
2019-03-06 12:03:16,412 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-03-06 12:03:16,414 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-03-06 12:03:19,940 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
Starting execution of program

如果zzy上未开启9001端口,到jobManager的web ui上看到会报下面的错

代码里设置了checkpoint

//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认checkpoint功能是disabled的,想要使用的时候需要先启用;每隔10000ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(10000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint


//设置statebackend

//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://zzy:9000/flink/checkpoints"));
//rocksDB需要引入依赖flink-statebackend-rocksdb_2.11
//env.setStateBackend(new RocksDBStateBackend("hdfs://zzy:9000/flink/checkpoints",true));
env.setStateBackend(new FsStateBackend("hdfs://192.168.5.63:9000/flink/checkpoints"));

但是JobManager的web ui上checkpoint并未触发

报错如下,应该是连接不到zzy 9001,识别不了zzy


选择监听50.63上的9001端口,如果没有nc命令,用

yum install -y nc

安装下,用下面的命令启动flink程序,采用flink on yarn的方式

./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
2019-03-06 16:00:24,680 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

如果一直出现Deployment xxx,此时可能是集群上没有资源了,
这里杀掉application_1551789318445_0007和application_1551789318445_0008(这两台是测试机器,资源很紧张)

然后再次重启程序


注意yarn是不是successfully.的状态


Yarn上启动了应用application_1551789318445_0009
点击AM进去jobManager的web ui界面


Checkpoint的UI

可以看到每隔10s进行一次checkpoint

Hdfs上查看checkpoint数据,看到保存了最近10次的checkpoint数据


95d75e802ba1eceefeaf98636e907883跟job ID是对应的


说明flink配置文件conf/flink-conf.yaml里的配置生效了

flink可以保存多个checkpoint,添加如下配置,指定最多需要保存Checkpoint的个数

state.checkpoints.num-retained: 10
上一篇下一篇

猜你喜欢

热点阅读