Flink-1.12(一)flink-1.12 搭建

2021-04-21  本文已影响0人  _大叔_

概要

Flink 是一个可以帮助我们做实时计算的框架。实时计算分为两种,一种是Stream,即可以一条一条处理;一种是batch,即批处理,待数据存储到一定数量再处理。

Flink 提供了很好的故障恢复能力,当我们的一个流处理程序故障后,一个新的流程服务会自动启动并替代它继续执行。我们并不需要担心故障机当时的状态,因为Flink已经帮我们做了记录(checkPoint)。

Flink 支持(exactly-once)一次而且仅一次地接收数据,保证收到数据而且不重复收到数据。支持(EventTime)把事件实际产生的时间做为处理数据的时间,除了EventTime还要Ingestion Time(数据进入Flink时间)、Processing Time(flink 真正处理数据的时间)。支持有状态的计算。

Flink 的window可以将无限数据流切割为有限数据流,主要分为两类:TimeWingdow(又可分为Tumbling Window滚动出窗口、Sliding Window滑动窗口) 和 CountWindow。

Flink 的Watermark(水位线)用于处理乱序事件,类似于一个延迟触发机制

Flink 组件分为 jobManager(工作管理) 和 TaskManager(任务管理),听起来好像没什么不同,但程序真正运行在 TaskManager,JobManager负责整个Flink集群任务的调度以及资源的管理。

安装

wget https://mirrors.bfsu.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz
tar -xvf flink-1.12.0-bin-scala_2.11.tgz
cd flink-1.12.0-bin-scala_2.11

启动

[root@localhost flink-1.12.2]# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host localhost.localdomain.
Starting taskexecutor daemon on host localhost.localdomain.

测试用例

# 执行一个测试程序
./bin/flink run examples/streaming/WordCount.jar
# 查看测试结果
tail log/flink-*-taskexecutor-*.out

停止运行

./bin/stop-cluster.sh

Flink 默认提供了 webUI,端口为 8081,可以直接在浏览器中访问

单机搭建

环境信息

IP 模式
xx.xxx.xx.103 master
xx.xxx.xx.104 slave

ssh免密登录

在集群的启动的时候 flink 会要求认证对方的登录密码,设置ssh免密登录可以为我们快速启动。

1. 为各服务器生成密钥

一路 enter 即可

# rea是一种加密规则
ssh-keygen -t rsa

查看密钥

[root@localhost /]# cd ~/.ssh/
[root@localhost .ssh]# ll
total 12
-rw-------. 1 root root 1679 Apr 17 14:46 id_rsa
-rw-r--r--. 1 root root  408 Apr 17 14:46 id_rsa.pub

id_rsa : 生成的私钥文件
id_rsa.pub : 生成的公钥文

2. 服务器之间交换密钥

生成的密钥两边文件名字一样,所以交换的时候需要更换文件名称,以防覆盖。

# 这种方式后可以直接测试连接 103 或 104
ssh-copy-id xx.xxx.xx.103
ssh-copy-id xx.xxx.xx.104

# 这种方式需要完整的执行以下步骤
scp id_rsa.pub root@xx.xxx.xx.103:/root/.ssh/104.id_rsa.pub
scp id_rsa.pub root@xx.xxx.xx.104:/root/.ssh/103.id_rsa.pub
3. 创建文件并赋权

在各服务机的~/.ssh目录下创建一个文件authorized_keys

touch authorized_keys

把xxx.id_rsa.pub内容追加到authorized_keys文件末尾(追加后可以把文件xxx.id_rsa.pub删除)

cat 103.id_rsa.pub >> authorized_keys
cat 104.id_rsa.pub >> authorized_keys

把本机生成的公钥也得 追加到 authorized_keys 文件

 cat id_rsa.pub >> authorized_keys 

赋权

chmod 600 authorized_keys 
chmod 700 -R .ssh

测试 连接

ssh root@xx.xxx.xx.103

workermanager

一个Flink 应用会携带JobManager和TaskManager两个组件,我们可以部署两个Flink应用实现以下简单的TaskManager的集群模式,FlinkA可以启动Job和Task两个组件,FlinkB启动Task组件。

启动前每台服务器最好把 Hostname 改掉,并修改 hosts

103 操作
  1. 修改 jobmanager 工作地点
vim flink-conf.yaml
# 加入如下
jobmanager.rpc.address: xx.xxx.xx.103
jobmanager.rpc.port: 6123
  1. 修改 workers文件,指定 workermanager 工作地点
vim workers
# 加入如下
xx.xxx.xx.103
xx.xxx.xx.104
  1. 启动 xx.xxx.xx.103 flink 节点
[root@node103 bin]# ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node103.
Starting taskexecutor daemon on host node103.
Starting taskexecutor daemon on host node104.
  1. 访问 http://xx.xxx.xx.103:8081/#/overview

建议修改一下属性值:
每个 JobManager 的可用内存值(jobmanager.memory.process.size
每个 TaskManager 的可用内存值 (taskmanager.memory.process.size,并检查 内存调优指南
每台机器的可用 CPU 数(taskmanager.numberOfTaskSlots
集群中所有 CPU 数(parallelism.default)和 临时目录(io.tmp.dirs

命令运行jar,可以到 flink/bin 目录下,执行

./flink run -c com.example.demo.SocketStreamWordCount -p 3 /opt/software/flink-1.12.2/examples/xxxxxx.jar --host 10.xxx.xx.103 --port 7777

查看当前运行的所有jobs,-a 可以查看被取消运行的job

./flink list [-a]

取消 JobID 的运行

./flink cancel JobID

jobManager 高可用

  1. 在 flink-1.12.2/conf下的 flink-conf.yaml 中添加 zookeeper 配置
# 1. 配置jobmanager rpc 地址
jobmanager.rpc.address: xx.xxx.xx.103
# 2. 修改taskmanager内存大小,可改可不改
taskmanager.memory.process.size: 2048m
# 3. 修改一个taskmanager中对于的taskslot个数,可改可不改
taskmanager.numberOfTaskSlots: 4
# 4. 修改并行度,可改可不改
parallelism.default: 4

# 5. 配置状态后端存储方式
state.backend:filesystem
# 6. 配置启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://192.168.244.129:9000/flink-checkpoints
# 7. 配置保存点,可以将快照保存到HDFS
state.savepoints.dir: hdfs://192.168.244.129:9000/flink-savepoints

# 8. 使用zookeeper搭建高可用
high-availability: zookeeper
# 9. 配置ZK集群地址
high-availability.zookeeper.quorum: xx.xxx.xx.103:2181
# 10. 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://xx.xxx.xx.103:9000/flink/ha/
# 11. 配置zookeeper client默认是 open,如果 zookeeper security 启用了更改成 creator
high-availability.zookeeper.client.acl: open
  1. 将配置过的HA的 flink-conf.yaml 分发到另外节点
  2. 分别到另外两个节点中修改flink-conf.yaml中的配置
  3. 在 masters 配置文件中添加多个节点
  4. 分发masters配置文件到另外两个节点
  5. 配置每个flink 下的 zoo.cfg 文件
# 新建snapshot存放的目录,在flink-1.12.2目录下建
mkdir tmp
cd tmp
mkdir zookeeper
#修改conf下zoo.cfg配置
vim zoo.cfg
#snapshot存放的目录
dataDir=/opt/software/flink-1.12.2/tmp/zookeeper
#配置zookeeper 地址
server.1=xx.xxx.xx.103:2888:3888
  1. 下载hadoop依赖包,每个flink节点都需要
下载地址:https://flink.apache.org/downloads.html#additional-components
将包复制到flink-1.13.2/lib目录下
  1. 启动 flink 集群
./start-cluster.sh

Yarn模式

以Yarn模式部署Flink 任务时,要求Flink是有Hadoop支持的版本,Hadoop 环境需要保证版本在2.2以上,并且集中安装有HDFS服务以及配置了相关flink、hadoop的环境变量。yarn 部署有两种方式,如下图介绍:

实际工作过程中推荐第二种方式进行部署,比较方便管理。

方式一

  1. 检查 hadoop 环境变量是否配置 HADOOP_CLASSPATH
HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
  1. 启动 flink/bin/yarn-session.sh
bin/yarn-session.sh -jm 1024m -tm 1024m -d
  1. 停止集群
yarn application -kill (yarn的flink id)

方式二

直接在flink/bin下运行命令即可,运行完成后会在yarn集群中创建一个临时的 flink 集群,当任务结束后对应的临时集群结束。

bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./要运行任务的jar

bin/flink run ./examples/batch/WordCount.jar 会从本机/tmp.yarn-properties-root 中的节点中找到applicationId 来找到对应的那个Flink集群,提交到对应的Flink集群中

bin/flink run -m hostname:port ./examples/batch/WordCount.jar 通过-m来指定Flink集群里面主节点的主机名,端口号,就是指定 jobmanager的host和port,表示你已知一个Flink集群了,提交到这个集群中

bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 表示在yarn集群里面去创建一个临时的Flink集群,并且把这个任务提交到临时的Flink集群中执行。

flink-yarn 的好处

  1. 提高大数据集群中机器的利用率
  2. 一套集群,可以执行MR任务,Spark任务,Flink任务等
上一篇 下一篇

猜你喜欢

热点阅读