flink-1.10.0 on yarn 配置
1.下载
注意不要下载集成hadoop依赖的预编译包,会有兼容性问题
cd /usr/local/
wget https://downloads.apache.org/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz
tar xvf flink-1.10.0-bin-scala_2.12.tgz
rm -rf flink-1.10.0-bin-scala_2.12.tgz
2.环境设置
ln -s flink-1.10.0 flink
export FLINK_HOME=/usr/local/flink
chmod 777 -R /usr/local/flink-1.10.0 ## 保证用户可读可执行
chmod 777 -R /usr/local/flink-1.10.0/log ## 保证用户读写权限
useradd flink
export HADOOP_USER_NAME=hdfs ##临时切换用户到hdfs
hadoop fs -mkdir /user/flink ## 配置flink在hdfs上的家目录
hadoop fs -chown flink:flink /user/flink ## 配置权限
hadoop fs -mkdir -p /flink/completed-jobs ## 配置flink的任务历史存储目录
hadoop fs -chown flink:flink /flink/completed-jobs ##如果hdfs启用ranger,相应的也要配置权限,ranger手动新增flink用户,然后赋权限
hadoop fs -chmod -R 777 /flink/completed-jobs # 保证所有用户都可写
sed -i '2i\export HADOOP_CLASSPATH=`hadoop classpath`' /usr/local/flink/bin/config.sh #这一步是声明本机的hadoop classpath,让flink能够加载到对应的hadoop依赖包,执行hadooo命令获取classpath
否则报错:
Could not build the program from JAR file
echo "export FLINK_HOME=/usr/local/flink" >> /etc/profile ## 保证flink命令全局可用
echo "export PATH=$PATH:$FLINK_HOME/bin
" >> /etc/profile ## 保证flink命令全局可用
export HADOOP_CONF_DIR=/etc/hadoop/conf
source /etc/profile # 生效环境变量
3.配置文件修改 FLINK_HOME/conf/flink-conf.yaml
配置文件修改 FLINK_HOME/conf/flink-conf.yaml
只需要修改下面的historyServer部分
修改历史记录在hdfs上的归档地址,写地址
jobmanager.archive.fs.dir: hdfs:///flink/completed-jobs/
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
修改历史记录在hdfs上的归档地址,读地址
historyserver.archive.fs.dir: hdfs:///flink/completed-jobs/
启动 historyserver
sudo -u flink $FLINK_HOME/bin/historyserver.sh start
启动命令
sudo -u flink $FLINK_HOME/bin/historyserver.sh stop
停止命令
historyserver
的日志文件在$FLINK_HOME/log
目录下保存
检查 http://0.0.0.0:8082 是否可以访问
客户端安装,参照historyserver的部署方式,只是不用启动historyserver进程
运行
./bin/flink run -m yarn-cluster -p 1 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
问题:之前只用的 -yn 指定container的个数,现在再1.10中使用会报错,那么flink如何决定taskmanager的个数呢?
从1.5版本开始,Flink on YARN时的容器数量——亦即TaskManager数量——将由程序的并行度自动推算,也就是说flink run脚本的-yn/--yarncontainer参数不起作用了。
自动推算:Job的最大并行度除以每个TaskManager分配的任务槽数。