Linux大数据教程@IT·互联网大数据,机器学习,人工智能

Hadoop框架基础(四)

2017-04-11  本文已影响336人  Z尽际

** Hadoop框架基础(四)

上一节虽然大概了解了一下mapreduce,徒手抓了海胆,不对,徒手写了mapreduce代码,也运行了出来。但是没有做更深入的理解和探讨。

那么……

本节目标:

* 深入了解mapreduce过程

* 成功部署Hadoop集群

** mapreduce原理

想要了解mapreduce原理,我们必须搞清楚处理数据时的每一个重要阶段,首先,贴上一张官方的图:

我们依次讨论每一个过程以及该过程对应的作用:

我先在这里假设一个情景,我现在有一个10G大小的words.txt,里面存放的是N多个英文单词。

这10G大小的文件分为若干个128M的文件块block分布存储于若干个服务器。

好,现在我要统计这10G文件中的单词出现频率。

** input split

一个split会对应一个map任务。

一般来讲,split的个数和block的个数相同,当然也可以多个block属于同一个split,但是后者会产生大量的网络和磁盘IO,原因在于一个split对应一个map任务,一个map任务肯定跑在某一台机器上,如果某个split所包含的多个block分布于不同的机器,首先需要做的操作就是把其他机器的block拷贝到运行map任务的机器上,这会耗费一定时间,所以,默认情况下,一个block对应一个split,源码中设定如下:

mapreduce.input.fileinputformat.split.minsize == 0

mapreduce.input.fileinputformat.split.maxsize == 10000

splitSize=max(minSize,min(maxSize, blockSize)),此为默认split大小

如果要修改,则如下方式:

recordSize表示一个记录的大小,分块要保证数据的完整性,所以:

int blockSize = Integer.parseInt(x); //x表示你希望的split大小

int splitSize = blockSize / recordSize * recordSize;

conf.setLong("mapred.max.split.size",splitSize);

conf.setLong("mapred.min.split.size",splitSize);

** map

此时输入的到map中的数据形式大致为:

<0, cat one hadoop element...>  ---> 调用一次map

<30, dog two one hadoop....>  ---> 调用一次map

……

省略号表示后边还有,其中0,30表示的是偏移量,每次从当前split中读取1行数据,比如第一次读取第一行,偏移量为0~29,第二次是第二行数据,偏移量是30~?,以此类推。每次读取都执行一次map任务,并调用一次map方法。map阶段结束,输出结果形式大致为:

<cat , 1>  <one, 1>  <hadoop, 1>  <element, 1> ……等等


从此进入shuffle阶段

** buffer in memory

这是一个状态描述,表明此刻在内存中开始操作,buffer在这里是内存中的一个环形数组。

之所以用环形数组来存放数据,是因为这样可以最大化的利用存储空间。

这个环形数组中存放数据分为两个类别:

1、元数据区(Kvmeta):

里面存放的每组数据都包含:

** value的起始位置

** key的起始位置

** partition值

** value的长度

2、数据区(Kvbuffer):

里面存放的每组数据都包含:

** key值,例如<cat ,1>中的cat

** value值,例如<cat, 1>中的1

注意:

* 以上两个区域的分界点为0,即0以上存储数据区内容,0以下存储元数据区的内容。

* 这个环形buffer虽然实际为一个字节数组,但抽象为一个IntBuffer,无论哪个区域中的数据,每组数据中的每个元素都占用4个字节,也就是每组中的每个元素的存储,数组下标都将移动4位(因为一个int为4个字节)。

* partition

分区的意义在于把一系列相似的单词分为同一个区。即单词归类处理,这样不同机器上的不同map任务输出的单词可以依据分区递交给相同的reduce做处理。

注意:

* 相关类: HashPartitioner

* 这里的“相似”,指的是:键(此例中为单词)的hash值在某一个范围内

* sort

map排序阶段,在buffer中把数据按照partion和key两个关键字做升序排序,这个排序只需要移动“元数据区”中的每组数据顺序即可。排序结果是“元数据区”中的每组数据按照partition分区聚集在一起,同一个partition分区内的key按照字典顺序排序。

* combine(可选)

结合阶段,可以在map阶段简化数据输出,减少后边spill溢写过程中,spill溢写文件的大小,例如:可以将<cat, 1> <cat, 1>这样的数据在map阶段合并为<cat, 2>这样的数据作为map输出,默认没有开启。

* spill

溢写阶段,当内存中的环形存储结构占用率达到一定程度(默认占用80%时,则开始溢写),则将环形数据区中的所有内容,刷入到当前本地硬盘能够存的下这些数据的目录中,以使内容腾出空间供后边继续使用。

相同的partition分区的数据写入到同一个文件中,类似:“spill10.out”,“spill11.out”这样的文件,每一个partition分区所产生的文件的存放位置和一些相关信息,存放在另一个“元数据”文件中,类似“spill10.out.index”,“spill11.out.index”(注意,这个元数据文件和刚才说的元数据区不是一码事)。

这个元数据文件包含:

** 起始位置

** 原始数据长度

** 压缩之后的数据长度

** crc32的校验数据

该文件的作用是:标记某个partition对应的文件在哪个目录,哪个索引中存放。

注意:

* spill10.out.index这样的文件不一定会产生,如果内存中放得下(针对这个文件数据的存放,内存只提供1M空间可用),就放在内存中。

* 内存占用达到80%,开始溢写,那么此时map任务还在进行,还在往内存里添加数据,新的数据的起始点(0点)为剩余空间的中间部分,然后数据区和元数据区分别往两边递增即可,溢写后释放内存后也不必改变什么,继续写入即可。

** map merge

map融合阶段,将溢写阶段产生的多个文件,根据所属分区,把具有相同partition分区的“元数据(从spill10.out.index这样的文件中读取的)”放置于同一个segment列表中,最后根据segment列表,把数据从spill溢写出来的文件一个一个中读取出来,写入到file.out文件中,同时将这一批段的数据索引(元数据分区等)写入到file.out.index文件中,最终生成两个文件,file.out和file.out.index,其中包含了多段数据,每段数据对应一个分区。

** compress (可选)

map压缩阶段,对map merge阶段产生的文件进行压缩处理,以便于在后边的网络传输过程中减少网络IO压力,提升效率。

至此,map端的shuffle过程结束。

** sort merge

reduce任务会根据分区数据段拉取每个map任务产生的数据,拉取后,因为可能涉及到多个map产生的数据,所以要进行排序,一边copy一边排序,最后把多个map产生的具有相同分区的数据合并为一个分区数据段,这个merge过程和map的merge算法过程一样。

在此完成shuffle阶段


** reduce

对于本例而言,此时产生的某个分区中的某个单词形式大概如下:

<cat, [1, 1, 1, 1, 1, 1]>,在调用reduce方法时,进行values各个元素的叠加操作即可。

** output

统计完成后,输出数据到文件目录,文件格式为part-r-00000这样形式的文件,存放于HDFS中。文件中key和value默认的分隔符为:\t

** Hadoop集群部署

之前我们在yarn框架中运行mapreduce任务,或者操作hdfs,其中的各种节点都是运行在一台虚拟机上的,现在我们要将hadoop部署在一个多台虚拟机构成的完全分布式集群中(全部都在一个机器节点上的叫做伪分布式,比如之前的方式)。部署前,我们先勾画一下各个节点的部署结构,如下图所示:

描述:

3台机器共有进程:HDFS的datanode,yarn的nodemanager

其中,HDFS的namenode开在z01这台机器上,secondarynamenode开在z03这台机器上

YARN的resourcemanager开在z02这台机器上。

注:SecondaryNameNode是用来协助NameNode整合fsimage和edits的。

一、准备系统环境

1、修改主机名

# vi /etc/hostname

2、主机名和ip地址的映射

# vi /etc/hosts,我的机器修改如图,注意,三台机器都要这么设置:

3、关闭防火墙和selinux

请跳转至Linux基础04查看相关方法。

4、创建普通用户

# useradd 用户名,如果已经有普通用户,则无需再次创建

# echo 666666 | passwd --stdin 用户名

5、配置静态IP和DNS

请参看Linux基础01内容

6、把后面两个虚拟机的系统启动级别改成“字符模式”(就是没有桌面,这样可以减少虚拟机负担,加速系统启动和运行)

# cat /etc/inittab,内容如图所示:

根据文件中的提示,可以使用命令:

systemctl set-default multi-user.target,来设置无界面启动linux

systemctl set-default graphical.target,来设置有界面启动linux

7、卸载服务器JDK

请参看Linux基础02中的内容

二、配置NTP时间服务器

对于我们当前这种案例,主要目标是把z01这台服务器设置为时间服务器,剩下的z02,z03这两台机器同步z01的时间,我们需要这样做的原因是因为,整个集群架构中的时间,要保持一致。

** 检查当前系统时区,使用命令:

# date -R,如图:

注意这里,如果显示的时区不是+0800,你可以删除localtime文件夹后,再关联一个正确时区的链接过去,命令如下:

# rm -rf /etc/localtime

# ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

** 同步时间

# ntpdate pool.ntp.org

** 检查软件包

查看ntp软件包是否已安装,使用命令:

# rpm -qa | grep ntp,如图,红框中的内容:

如果没有红框中的内容,则可以使用命令:

# yum -y install ntp,来进行安装

** 修改ntp配置文件

# vi /etc/ntp.conf

去掉下面这行前面的# ,并把网段修改成自己的网段:

restrict 192.168.122.0 mask 255.255.255.0 nomodify notrap

注释掉以下几行:

#server 0.centos.pool.ntp.org

#server 1.centos.pool.ntp.org

#server 2.centos.pool.ntp.org

把下面两行前面的#号去掉,如果没有这两行内容,需要手动添加

server  127.127.1.0    # local clock

fudge  127.127.1.0 stratum 10

最后,如图所示:

** 重启ntp服务

# systemctl start  ntpd.service,注意,如果是centOS7以下的版本,使用命令:service ntpd start

# systemctl enable ntpd.service,注意,如果是centOS7以下的版本,使用命令:chkconfig ntpd on

** z03,z03去同步z01这台时间服务器时间

首先需要关闭这两台计算机的ntp服务

# systemctl stop  ntpd.service,centOS7以下,则:service ntpd stop

# systemctl disable ntpd.service,centOS7以下,则:chkconfig ntpd off

# systemctl status ntpd,查看ntp服务状态

# pgrep ntpd,查看ntp服务进程id

同步第一台服务器z01的时间:

# ntpdate z01,如图:

** 制定计划任务,周期性同步时间

# crontab -e

*/10 * * * * /usr/sbin/ntpdate z01,如图所示:

重启定时任务:

# systemctl restart  crond.service,centOS7以下使用:service crond restart,z03这台机器的配置同理

三、配置无密钥登录

配置hadoop集群,首先需要配置集群中的各个主机的ssh无密钥访问

在z01上,通过如下命令,生成一对公私钥对

$ ssh-keygen -t rsa,一顿回车操作,这条命令执行完毕后(注意使用普通用户执行该命令),会在/home/z/.ssh/目录下生成两个文件:id_rsa 和 id_rsa.pub,如图所示:

生成之后呢,把z01生成的公钥拷贝给z01,z02,z03这三台机器,对,没错,包含当前机器。

$ ssh-copy-id z01

$ ssh-copy-id z02

$ ssh-copy-id z03

完成后,z02机器如图(z03同理):

以上完成了z01生成私钥,公钥并把公钥拷贝给z01,z02,z03三台机器的过程,z02,z03这两台机器也需要进行如上操作。全部完成后,我们可以在任意一台机器上,无密钥的连接到另外一台机器,比如,我们在z01连接到z02这台机器,使用命令:

$ ssh z02,如图:

这样就成功的在z01的机器登录到z02机器了。

四、安装配置JDK

使用root用户,在后面两台机器上创建/opt/modules文件夹,并使该文件夹的所属改为普通用户。

接着便可以使用远程命令scp,把已经在z01中安装好的jdk目录拷贝给另外两台机器。

$ scp -r /opt/modules/jdk1.7.0_67/ z02:/opt/modules/

$ scp -r /opt/modules/jdk1.7.0_67/ z03:/opt/modules/

注意中间有空格分开。配置完成后,记得去z02,z03修改/etc/profile环境变量

五、安装配置Hadoop

** 首先,需要先删除z01中的/opt/modules/hadoop-2.5.0/data目录,执行命令:

$ rm -rf /opt/modules/hadoop-2.5.0/data

** 在如下文件中,修改JAVA_HOME

hadoop-env.sh  yarn-env.sh  mapred-env.sh

export JAVA_HOME=/opt/modules/jdk1.8.0_121

** 修改HDFS默认地址、HDFS临时存储路径

涉及文件:core-site.xml

fs.defaultFS:hdfs://z01:8020

hadoop.tmp.dir:/opt/modules/hadoop-2.5.0/data

如图:

** 声明哪些服务器是datanode

涉及文件:slaves

z01

z02

z03

如图:

** 修改数据存放的副本数,SecondaryNameNode节点地址

涉及文件:hdfs-site.xml

dfs.replication:3

dfs.namenode.secondary.http-address:z03:50090

dfs.namenode.http-address:z01:50070

dfs.permissions.enabled:false

如图:

**resourcemanager节点配置,以及一些其他配置

涉及文件:yarn-site.xml

yarn.resourcemanager.hostname:z02

yarn.nodemanager.aux-services:mapreduce_shuffle

yarn.log-aggregation-enable:true

yarn.log-aggregation.retain-seconds:86400

如图:

** jobhistory服务以及其他设置

涉及文件:mapred-site.xml

mapreduce.framework.name:yarn

mapreduce.jobhistory.address:z01:10020

mapreduce.jobhistory.webapp.address:z01:19888

如图:

** 配置好后,拷贝hadoop安装目录给其他服务器

$ rm -rf /opt/modules/hadoop-2.5.0/share/doc/,删除该文档目录,以减少远程拷贝的体积

$ scp -r /opt/modules/hadoop-2.5.0/ z02:/opt/modules/

$ scp -r/opt/modules/ hadoop-2.5.0/ z03:/opt/modules/

全部搞定后,接下来我们就可以启动这个分布式系统了

六、启动Hadoop

** 在z01需要先格式化hdfs的namenode:

$ bin/hdfs namenode -format

** 使用start的脚本启动集群中所有的hdfs服务,包含namenode和datanode节点

$ sbin/start-dfs.sh

** 在z02中启动yarn服务,包含resourcemanager和nodemanager,注意,如果resourcemanger和namenode服务不在同一台机器上,那么启动resourcemanager服务必须在所在的机器启动,这里参看我们之前设定的集群配置图,所以需要在z02机器上执行如下命令:

$ sbin/start-yarn.sh

启动完成后,分别查看z01,z02,z03机器的jps,如下图:

z01:

z02:

z03:

在对比一下之前的集群配置图,是符合我们的期望的。

** 总结

本节主要深入讨论mapreduce的运算原理及过程,以及如何配置一个hadoop完全分布式集群。


IT全栈公众号:

QQ大数据技术交流群(广告勿入):476966007


下一节:Hadoop框架基础(五)

上一篇下一篇

猜你喜欢

热点阅读