20190721_flink安装

2019-07-22  本文已影响0人  行走的数据智能

查看系统版本号

cat /etc/redhat-release

CentOS Linux release 7.6.1810 (Core)

查看系统磁盘挂载情况

df -h

/home 目录下有 873G 磁盘存储空间

故把程序应该安装在 /home 目录下

一、Java JDK 安装

上传jdk安装包 jdk-8u191-linux-x64.tar.gz 至 /home/bigdata 目录下

cd /home/bigdata

ls

sudo tar -zxvf jdk-8u191-linux-x64.tar.gz

创建软链接

ln -s jdk1.8.0_191 jdk

配置 java 环境变量

在 vi 编辑中, 使用 Shift + g 快速定位至最后一行

sudo vi /etc/profile

export JAVA_HOME=/home/bigdata/jdk

export PATH=$JAVA_HOME/bin:$PATH

source /etc/profile

验证 java jdk 是否安装成功

java -version

二、Scala安装

上传scala安装包 scala-2.11.11.tgz 至 /home/bigdata 目录下

cd /home/bigdata

ls

sudo tar -zxvf scala-2.11.11.tgz

创建软链接

ln -s scala-2.11.11 scala

配置 scala 环境变量

在 vi 编辑中, 使用 Shift + g 快速定位至最后一行

sudo vi /etc/profile

export SCALA_HOME=/home/bigdata/scala

export PATH=$SCALA_HOME/bin:$PATH

source /etc/profile

验证 scala 是否安装成功

scala -version

测试scala环境

scala

1+1

:q  退出

三、ssh 免密登录设置

cd /home/nifi

pwd

ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa

cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

sudo chmod 600 ~/.ssh/authorized_keys

验证 ssh 免密登录是否设置成功

ssh singlecluster

备注:配置 ssh 免密登录应注意

配置 ssh 免密登录时,应设置 /home/nifi 目录权限为 755,

/home/nifi/.ssh/authorized_keys 目录权限为 600,

否则可能 ssh 免密登录设置不成功!!

四、hadoop安装

4.1、上传hadoop安装包 hadoop-2.7.3.tar.gz 至 /home/bigdata 目录下

cd /home/bigdata

sudo tar -zxvf hadoop-2.7.3.tar.gz

创建软链接

ln -s hadoop-2.7.3 hadoop

ll

4.2、在/home/bigdata/hadoop目录下,建立tmp、hdfs/name、hdfs/data目录,执行如下命令

cd /home/bigdata/hadoop

sudo chmod -R 777 /home

mkdir -p tmp hdfs/name hdfs/data

4.3、配置 hadoop 环境变量

在 vi 编辑中, 使用 Shift + g 快速定位至最后一行

sudo vi /etc/profile

export HADOOP_HOME=/home/bigdata/hadoop

export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

source /etc/profile

验证 hadoop 变量是否配置成功

hadoop version

4.4、Hadoop配置文件配置

进入/home/bigdata/hadoop/etc/hadoop目录,配置 hadoop-env.sh等。涉及的配置文件如下:

hadoop-env.sh

yarn-env.sh

mapred-env.sh

core-site.xml

hdfs-site.xml

mapred-site.xml

yarn-site.xml

1)配置hadoop-env.sh

vi hadoop-env.sh

# The java implementation to use.

#export JAVA_HOME=${JAVA_HOME}

export JAVA_HOME=/home/bigdata/jdk

2)配置yarn-env.sh

vi yarn-env.sh

#export JAVA_HOME=/home/y/libexec/jdk1.7.0/

export JAVA_HOME=/home/bigdata/jdk

3)配置mapred-env.sh

vi mapred-env.sh

# export JAVA_HOME=/home/y/libexec/jdk1.6.0/

export JAVA_HOME=/home/bigdata/jdk

4)配置core-site.xml

cd /home/hadoop/etc/hadoop

vi core-site.xml

添加如下配置:

<configuration>

<property>

<name>fs.default.name</name>

<value>hdfs://singlecluster:9000</value>

<description>HDFS的URI,文件系统://namenode标识:端口号</description>

</property>

<property>

<name>hadoop.tmp.dir</name>

<value>/home/bigdata/hadoop/tmp</value>

<description>namenode上本地的hadoop临时文件夹</description>

</property>

</configuration>

5)配置hdfs-site.xml

vi hdfs-site.xml

添加如下配置

<configuration>

<property>

<name>dfs.name.dir</name>

<value>/home/bigdata/hadoop/hdfs/name</value>

<description>namenode上存储hdfs名字空间元数据</description>

</property>

<property>

<name>dfs.data.dir</name>

<value>/home/bigdata/hadoop/hdfs/data</value>

<description>datanode上数据块的物理存储位置</description>

</property>

<property>

<name>dfs.replication</name>

<value>3</value>

<description>副本个数,配置默认是3,应小于datanode机器数量</description>

</property>

<property>

<name>dfs.permissions</name>

<value>false</value>

</property>

</configuration>

6)配置mapred-site.xml

cp mapred-site.xml.template mapred-site.xml

vi mapred-site.xml

添加如下配置:

<configuration>

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

<!-- 开启uber模式(针对小作业的优化) -->

<property>

<name>mapreduce.job.ubertask.enable</name>

<value>true</value>

</property>

<!-- 启动uber模式的最大map数 -->

<property>

<name>mapreduce.job.ubertask.maxmaps</name>

<value>9</value>

</property>

<!-- 启动uber模式的最大reduce数 -->

<property>

<name>mapreduce.job.ubertask.maxreduces</name>

<value>1</value>

</property>

</configuration>

mapred-site.xml文件全部内容

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!--

  Licensed under the Apache License, Version 2.0 (the "License");

  you may not use this file except in compliance with the License.

  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software

  distributed under the License is distributed on an "AS IS" BASIS,

  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

  See the License for the specific language governing permissions and

  limitations under the License. See accompanying LICENSE file.

-->

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>

        <name>mapreduce.framework.name</name>

        <value>yarn</value>

</property>

</configuration>

7)配置yarn-site.xml

vi yarn-site.xml

添加如下配置:

<configuration>

<property>

<name>yarn.nodemanager.aux-services</name>

<value>mapreduce_shuffle</value>

</property>

</configuration>

8)修改masters

sudo vi masters

singlecluster

9)修改slaves

sudo vi slaves

singlecluster

10)修改 /etc/hosts

sudo vi /etc/hosts

192.168.1.26 singlecluster

4.5、进入/home/bigdata/hadoop目录,格式化namenode节点

bin/hadoop namenode -format

4.6、进入/home/bigdata/hadoop目录下启动hadoop集群

cd /home/bigdata/hadoop

ls

sbin/start-all.sh

使用jps命令查看进程,检验hadoop集群是否已经启动。

jps

3393 NameNode

4481 Jps

3907 ResourceManager

3543 DataNode

4023 NodeManager

3737 SecondaryNameNode

进入/home/bigdata/hadoop目录下关闭hadoop集群

cd /home/bigdata/hadoop

sbin/stop-all.sh

4.7、打开浏览器查看 HDFS、YARN 的运行状态

查看 HDFS 运行状态

http://192.168.1.26:50070/

查看 YARN 运行状态

http://192.168.1.26:8088

备注:若页面输入对应的ip端口号地址查看不了网页,需验证服务器防火墙是否关闭,应保持防火墙关闭

关闭Centos7防火墙,不然端口无法访问

查看防火墙状态

systemctl status firewalld.service

临时关闭防火墙(下次重启防火墙再次开启)

systemctl stop firewalld.service

永久关闭防火墙

systemctl disable firewalld.service

4.8、Hadoop程序实例 wordcount程序运行

hadoop fs -mkdir -p /user/nifi/jobs/wordcount/input

hadoop fs -put /home/bigdata/hadoop/etc/hadoop/*.xml /user/nifi/jobs/wordcount/input

hadoop jar /home/bigdata/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar wordcount /user/nifi/jobs/wordcount/input /user/nifi/jobs/wordcount/output

hadoop fs -ls -R /

hadoop fs -cat /user/nifi/jobs/wordcount/output/*

五、安装 kafka

5.1、上传kafka安装包 kafka_2.11-2.1.1.tgz 至 /home/bigdata 目录下

cd /home/bigdata

ls

sudo tar -zxvf kafka_2.11-2.1.1.tgz

创建软链接

ln -s kafka_2.11-2.1.1 kafka

5.2、修改 zookeeper、kafka 配置文件 zookeeper.properties,server.properties

cd /home/bigdata/kafka/config

sudo vi zookeeper.properties

dataDir=/home/bigdata/kafka/tmp/zookeeper

sudo vi server.properties

log.dirs=/home/bigdata/kafka/tmp/kafka-logs

5.3、启动zookeeper和kafka(这里,zookeeper集成于kafka中)

cd /home/bigdata/kafka

启动zookeeper(后台运行)

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

查看zookeeper是否运行

jps

6740 QuorumPeerMain

关闭zookeeper

bin/zookeeper-server-stop.sh config/zookeeper.properties &

启动kafka(后台运行)

nohup bin/kafka-server-start.sh config/server.properties &

或者

nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

查看kafka是否运行

jps

7587 Kafka

关闭kafka

bin/kafka-server-stop.sh config/server.properties

5.4、使用kafka

1)、创建topic:

  /home/bigdata/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic apache-flink-test

2)、查看topic:

  /home/bigdata/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

3)、生产者

  /home/bigdata/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic apache-flink-test

4)、消费者

/home/bigdata/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic apache-flink-test --from-beginning

该命令中含有过时方法 --zookeeper,该方法在老版本kafka0.90之前使用

/home/bigdata/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:9092 --topic apache-flink-test --from-beginning

备注:消费kafka时遇到的问题:kafka 创建消费者报错 consumer zookeeper is not a recognized option

在做kafka测试的时候,使用命令bin/kafka-console-consumer.sh --zookeeper 192.168.0.140:2181,192.168.0.141:2181 --topic test --from-beginning启动消费者,发现一只报错consumer zookeeper is not a recognized option,搜索了半天,一只没有解决,最后,换了一个低版本的kakfa,发现在启动的时候说使用 --zookeeper是一个过时的方法,此时,才知道原来在最新的版本中,这种启动方式已经被删除了,

最后附上0.90版本之后启动消费者的方法: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

CSDN参考链接:

kafka 创建消费者报错 consumer zookeeper is not a recognized option

https://blog.csdn.net/csdn_sunlighting/article/details/81516646

kafka中消费 kafka topic 后应该关闭消费进程

(1)使用消费命令时,用 Ctrl + C 关闭消费进程

(2)jps -m 查看kafka消费进程号,之后杀死对应的进程

jps -m

kill -9 进程号

5)、删除topic

  /home/bigdata/kafka/bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】

  [server.properties需要 设置delete.topic.enable=true]

六、安装 flink

6.1、上传flink安装包 flink-1.7.2-bin-hadoop27-scala_2.11.tgz 至 /home/bigdata 目录下

cd /home/bigdata

ls

sudo tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz

创建软链接

ln -s flink-1.7.2 flink

6.2、配置 flink 环境变量

在 vi 编辑中, 使用 Shift + g 快速定位至最后一行

sudo vi /etc/profile

export FLINK_HOME=/home/bigdata/flink

export PATH=$FLINK_HOME/bin:$PATH

source /etc/profile

验证 flink 变量是否配置成功

flink --version

或者

flink --v

6.3、启动flink

cd /home/bigdata/flink

./bin/start-cluster.sh

查看flink是否成功启动(新增两个进程)

jps

18469 TaskManagerRunner

18025 StandaloneSessionClusterEntrypoint

6.4、打开浏览器查看Flink任务运行状态

http://192.168.1.26:8081

6.5、flink伪分布式验证

使用 start-scala-shell.sh 来验证

${FLINK_HOME}/bin/start-scala-shell.sh是flink提供的交互式clinet,可以用于代码片段的测试,方便开发工作,

它有两种启动方式,一种是工作在本地,另一种是工作到集群。

本例中因为机器连接非常方便,就直接使用集群进行测试,在开发中,如果集群连接不是非常方便,可以连接到本地,在本地开发测试通过后,再连接到集群进行部署工作。

如果程序有依赖的jar包,则可以使用 -a <path/to/jar.jar> 或 --addclasspath <path/to/jar.jar>参数来添加依赖。

1)本地连接

${FLINK_HOME}/bin/start-scala-shell.sh local

2)集群连接

${FLINK_HOME}/bin/start-scala-shell.sh remote <hostname> <portnumber>

3)带有依赖包的格式

${FLINK_HOME}/bin/start-scala-shell.sh [local|remote<host><port>] --addclasspath<path/to/jar.jar>

4)查看帮助

${FLINK_HOME}/bin/start-scala-shell.sh --help

cd /home/bigdata/flink

bin/start-scala-shell.sh --help

Flink Scala Shell

Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...

Command: local [options]

Starts Flink scala shell with a local Flink cluster

  -a, --addclasspath <path/to/jar>

                           Specifies additional jars to be used in Flink

Command: remote [options] <host> <port>

Starts Flink scala shell connecting to a remote cluster

  <host>                   Remote host name as string

  <port>                   Remote port as integer

  -a, --addclasspath <path/to/jar>

                           Specifies additional jars to be used in Flink

Command: yarn [options]

Starts Flink scala shell connecting to a yarn cluster

  -n, --container arg      Number of YARN container to allocate (= Number of TaskManagers)

  -jm, --jobManagerMemory arg

                           Memory for JobManager container

  -nm, --name <value>      Set a custom name for the application on YARN

  -qu, --queue <arg>       Specifies YARN queue

  -s, --slots <arg>        Number of slots per TaskManager

  -tm, --taskManagerMemory <arg>

                           Memory per TaskManager container

  -a, --addclasspath <path/to/jar>

                           Specifies additional jars to be used in Flink

  --configDir <value>      The configuration directory.

  -h, --help               Prints this usage text

5)使用集群模式去验证

cd /home/bigdata/flink

bin/start-scala-shell.sh remote 192.168.1.26 8081

批处理验证:

val text = benv.fromElements("To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,")

val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)

counts.print()

流处理验证:

val dataStream = senv.fromElements(1, 2, 3, 4)

dataStream.countWindowAll(2).sum(0).print()

senv.execute("My streaming program")

参考CSDN博客链接:

Flink部署-standalone模式 - kwame211的博客 - CSDN博客

https://blog.csdn.net/kwame211/article/details/89332391

上一篇下一篇

猜你喜欢

热点阅读