flinkHadoopBIG DATA-实时计算

Flink部署

2019-03-11  本文已影响21人  零度沸腾_yjz

Flink作为一个分布式流式计算引擎,需要计算资源才可以执行应用程序。Flink能够与目前所有通用的资源管理框架集成,比如Hadoop YARN、Apache Mesos和Kubernetes。除了运行在现有资源管理器上,Flink还能够独立部署运行(stand-alone cluster),也就是自身提供资源管理。

Standalone集群部署

Flink运行在类Unix操作系统之上,所以你可以部署在Linux、Mac OS X和Cygwin上。

环境准备

Java需要配置JAVA_HOME环境变量,并且指向Java安装目录。
ssh需要在集群节点上配置免密码登录,并且Flink安装目录也要在所有节点的相同目录上,这样我们就可以使用Flink的脚本来管理它们了。

Flink集群由一个Master节点和一个或多个Worker节点组成。

主机 节点类型 服务名称
10.0.0.1 master jobmanager
10.0.0.2 worker taskmanager
10.0.0.3 worker taskmanager
flink-cluster

下载安装

使用Flink并不是必须要提前安装Hadoop,如果我们不打算使用Hadoop组件(比如HDFS、YARN、HBase等),我们就可以下载没有将Hadoop预编译到Flink的二进制安装包上。
Flink下载页面:https://flink.apache.org/downloads.html

tar xzf flink-*.tgz
cd flink-*

如果需要与Hadoop集成使用,需要选择预编译好的Hadoop版本,或者自己编译Flink源码来指定Hadoop版本。

配置Flink

我们下载解压缩Flink之后,就需要对Flink进行配置了。Flink配置文件在${FLINK_HOME}/conf/目录中,我们主要的配置文件为flink-conf.yaml
在flink-config.yaml配置文件添加以下基础配置项:

vim conf/flink-conf.yaml

jobmanager.rpc.address: 10.0.0.1
jobmanager.heap.mb: 2048m
taskmanager.heap.mb: 1024m

如果每个worker节点的内存不同,想要为某些特定worker节点多指定一些内存,则可以在特定节点上使用环境变量FLINK_TM_HEAP来覆盖指定。

配置完flink-conf.yaml后,还需要在conf/slaves文件中给出所有worker节点。

vim conf/slaves

10.0.0.2
10.0.0.3

这里给出的是Flink的最简单集群配置,更多配置信息可以查看:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html

节点分发

配置好后,我们需要将Flink目录文件分发到集群中的所有其它节点上。

scp -r flink-1.7.2 root@10.0.0.2:/opt/yjz/flink/flink-1.7.2/
scp -r flink-1.7.2 root@10.0.0.3:/opt/yjz/flink/flink-1.7.2/

特别注意:所有节点的Flink所在目录必须一致。

服务启动

节点分发后我们通过start-cluster.sh脚本在Master节点上来启动Flink集群。由于我们在slaves文件里面指定了所有worker节点,所以master节点会通过ssh免密码登录来启动所有worker节点。

bin/start-cluster.sh

启动成功后我们可以通过UI界面来查看:http://10.0.0.1:8081

flink-ui

停止服务

停止服务直接使用bin/stop-cluster.sh脚本即可。

高可用部署

Flink集群的JobManager同时负责任务调度和资源管理(在Standalone模式下),可想而知它的压力是很大的。在默认情况下Flink集群只会启动一个JobManager,这样就会存在单点故障(single point of failure, SPOF)。
因此,一般在生产环境中我们需要为Standalone模式或YARN模式提供Flink的高可用部署方式(High Availability, HA)。

这里我们给出Standalone部署模式的HA部署方案,关于YARN的可以放到Flink on YARN部署文章中。

Flink高可用部署原理

Flink的HA部署方案和业界的其它大数据处理框架HA基本一致:启动过程中启动多个JobManager,然后通过Zookeeper来选举出一个leader,其它JobManager作为备用主节点(standby)。

下图是描述在JobManager启动时首先选举一个Leader,当Leader节点挂掉后,从standby leader中再重新选举出一个新的Leader节点。

Flink-HA

HA配置

Flink的JobManager使用Zookeeper作为分布式协调服务,所以我们需要独立部署一套Zookeeper,然后在Flink上添加一些必要的配置。

配置masters文件

我们需要将所有JobManager节点配置到conf/masters文件中,配置内容除了主机地址外,还需要配置webUI的端口。

jobManagerAddress1: webUIPort1
[...]
jobManagerAddressN: webUIportN

默认JobManager的通信端口是随机启动的,我们也可以为Flink指定特定的通信端口,但是范围只能是:50010,50011,50020-50025,50050-50075。(该配置在conf/flink-conf.yaml中)

high-availability.jobmanager.port: 50010

配置flink-conf.yaml文件

HA其余配置都在conf/flink-conf.yaml中,以下是基本配置:

#必要配置,通过Zookeeper启动HA模式
high-availability: zookeeper
#必要配置,指定Zookeeper连接地址,可以配置多个broker,防止Zk单节点挂掉
high-availability.zookeeper.quorum: 10.0.0.1:2181,10.0.0.2:2181
#建议配置,存储JobManager集群节点的根目录
high-availability.zookeeper.path.root: /flink
#建议配置,存储协调数据的目录
high-availability.cluster-id: /defaul_ns
#必须配置,JobManager中持久化元数据的存储目录(Zookeeper只存储了状态信息)
high-availability.storageDir: hdfs:///flink/recovery

启动集群

在启动Flink之前,要确保Zookeeper集群已经启动并运行。

bin/start-cluster.sh

Flink自带Zookeeper集群

对于测试集群,如果我们没有Zookeeper实例集群,可以使用Flink自带的Zookeeper集群。在conf/zoo.cfg下有Zookeeper的配置模板。我们可以指定运行Zookeeper服务的节点:

server.1=address1: address1:peerPort1:leaderPort1
[...]
server.X=addressX: addressX:peerPort:leaderPort

然后使用Flink自带的脚本启动Zookeeper集群:bin/start-zookeeper-quorum.sh

对于生产环境,建议使用自己的独立Zookeeper集群。

单机部署

我们在学习或测试的时候可以使用本地单机Flink模式,单机Flink部署方式非常简单,我们只需要将Flink二进制安装包下载下来后,直接使用bin/flink-start.sh来启动即可。

默认conf/flink-conf.yaml中的jobmanager.rpc.address配置项为localhost,slaves文件中也为localhost,所以会在本地即启动jobmanager又启动taskmanager。

Flink CLI

Flink提供了命令行接口(Command-Line Interface,CLI)来运行作业jar包和控制作业执行。CLI位于bin/flink 目录下。
Flink CLI格式如下:

./flink <ACTION> [OPTIONS] [ARGUMENTS]

ACTION包含了一下几类:

ACTION 使用方式 说明
run ./flink run [OPTIONS] <jar-file> <arguments> 编译并运行作业
info ./flink info [OPTIONS] <jar-file> <arguments> 以JSON的形式显示程序的优化执行计划
list ./flink list [OPTION] 列出正在执行和调度(scheduled)的任务
stop ./flink stop [OPTION] <Job ID> 停止运行的程序,注意使用stop只能停止流处理作业。
cancel ./flink cancel [OPTION] <Job ID> 取消运行的程序
savepoint ./flink savepoint [OPTIONS] <Job ID> [<target directory>] 为运行中的作业触发保存点
modify ./flink modify <Job ID> [OPTIONS] 修改运行中的程序

具体OPTIONS参数我们可以通过./flink 命令详细查看使用。

stop和cancel虽然都是停止作业,但是两者实现是不一样的。使用cancel方法作业中的operator会立即接收到停止命令,来取消任务。如果operator没有取消任务,Flink开始定期中断线程,直到它停止。而stop是以一种更优雅的方式来停止作业,使用Stop停止作业,任务数据源需要实现StoppableFunction接口,这样当收到stop命令时,数据源首先停止发送数据,然后等待集群中的作业执行完成,最后正常停止作业。

关注我

欢迎关注我的公众号,会定期推送优质技术文章,让我们一起进步、一起成长!
公众号搜索:data_tc
或直接扫码:🔽


欢迎关注我
上一篇下一篇

猜你喜欢

热点阅读