Hadoop框架基础(四)
** Hadoop框架基础(四)
上一节虽然大概了解了一下mapreduce,徒手抓了海胆,不对,徒手写了mapreduce代码,也运行了出来。但是没有做更深入的理解和探讨。
那么……
本节目标:
* 深入了解mapreduce过程
* 成功部署Hadoop集群
** mapreduce原理
想要了解mapreduce原理,我们必须搞清楚处理数据时的每一个重要阶段,首先,贴上一张官方的图:
我们依次讨论每一个过程以及该过程对应的作用:
我先在这里假设一个情景,我现在有一个10G大小的words.txt,里面存放的是N多个英文单词。
这10G大小的文件分为若干个128M的文件块block分布存储于若干个服务器。
好,现在我要统计这10G文件中的单词出现频率。
** input split
一个split会对应一个map任务。
一般来讲,split的个数和block的个数相同,当然也可以多个block属于同一个split,但是后者会产生大量的网络和磁盘IO,原因在于一个split对应一个map任务,一个map任务肯定跑在某一台机器上,如果某个split所包含的多个block分布于不同的机器,首先需要做的操作就是把其他机器的block拷贝到运行map任务的机器上,这会耗费一定时间,所以,默认情况下,一个block对应一个split,源码中设定如下:
mapreduce.input.fileinputformat.split.minsize == 0
mapreduce.input.fileinputformat.split.maxsize == 10000
splitSize=max(minSize,min(maxSize, blockSize)),此为默认split大小
如果要修改,则如下方式:
recordSize表示一个记录的大小,分块要保证数据的完整性,所以:
int blockSize = Integer.parseInt(x); //x表示你希望的split大小
int splitSize = blockSize / recordSize * recordSize;
conf.setLong("mapred.max.split.size",splitSize);
conf.setLong("mapred.min.split.size",splitSize);
** map
此时输入的到map中的数据形式大致为:
<0, cat one hadoop element...> ---> 调用一次map
<30, dog two one hadoop....> ---> 调用一次map
……
省略号表示后边还有,其中0,30表示的是偏移量,每次从当前split中读取1行数据,比如第一次读取第一行,偏移量为0~29,第二次是第二行数据,偏移量是30~?,以此类推。每次读取都执行一次map任务,并调用一次map方法。map阶段结束,输出结果形式大致为:
<cat , 1> <one, 1> <hadoop, 1> <element, 1> ……等等
从此进入shuffle阶段
** buffer in memory
这是一个状态描述,表明此刻在内存中开始操作,buffer在这里是内存中的一个环形数组。
之所以用环形数组来存放数据,是因为这样可以最大化的利用存储空间。
这个环形数组中存放数据分为两个类别:
1、元数据区(Kvmeta):
里面存放的每组数据都包含:
** value的起始位置
** key的起始位置
** partition值
** value的长度
2、数据区(Kvbuffer):
里面存放的每组数据都包含:
** key值,例如<cat ,1>中的cat
** value值,例如<cat, 1>中的1
注意:
* 以上两个区域的分界点为0,即0以上存储数据区内容,0以下存储元数据区的内容。
* 这个环形buffer虽然实际为一个字节数组,但抽象为一个IntBuffer,无论哪个区域中的数据,每组数据中的每个元素都占用4个字节,也就是每组中的每个元素的存储,数组下标都将移动4位(因为一个int为4个字节)。
* partition
分区的意义在于把一系列相似的单词分为同一个区。即单词归类处理,这样不同机器上的不同map任务输出的单词可以依据分区递交给相同的reduce做处理。
注意:
* 相关类: HashPartitioner
* 这里的“相似”,指的是:键(此例中为单词)的hash值在某一个范围内
* sort
map排序阶段,在buffer中把数据按照partion和key两个关键字做升序排序,这个排序只需要移动“元数据区”中的每组数据顺序即可。排序结果是“元数据区”中的每组数据按照partition分区聚集在一起,同一个partition分区内的key按照字典顺序排序。
* combine(可选)
结合阶段,可以在map阶段简化数据输出,减少后边spill溢写过程中,spill溢写文件的大小,例如:可以将<cat, 1> <cat, 1>这样的数据在map阶段合并为<cat, 2>这样的数据作为map输出,默认没有开启。
* spill
溢写阶段,当内存中的环形存储结构占用率达到一定程度(默认占用80%时,则开始溢写),则将环形数据区中的所有内容,刷入到当前本地硬盘能够存的下这些数据的目录中,以使内容腾出空间供后边继续使用。
相同的partition分区的数据写入到同一个文件中,类似:“spill10.out”,“spill11.out”这样的文件,每一个partition分区所产生的文件的存放位置和一些相关信息,存放在另一个“元数据”文件中,类似“spill10.out.index”,“spill11.out.index”(注意,这个元数据文件和刚才说的元数据区不是一码事)。
这个元数据文件包含:
** 起始位置
** 原始数据长度
** 压缩之后的数据长度
** crc32的校验数据
该文件的作用是:标记某个partition对应的文件在哪个目录,哪个索引中存放。
注意:
* spill10.out.index这样的文件不一定会产生,如果内存中放得下(针对这个文件数据的存放,内存只提供1M空间可用),就放在内存中。
* 内存占用达到80%,开始溢写,那么此时map任务还在进行,还在往内存里添加数据,新的数据的起始点(0点)为剩余空间的中间部分,然后数据区和元数据区分别往两边递增即可,溢写后释放内存后也不必改变什么,继续写入即可。
** map merge
map融合阶段,将溢写阶段产生的多个文件,根据所属分区,把具有相同partition分区的“元数据(从spill10.out.index这样的文件中读取的)”放置于同一个segment列表中,最后根据segment列表,把数据从spill溢写出来的文件一个一个中读取出来,写入到file.out文件中,同时将这一批段的数据索引(元数据分区等)写入到file.out.index文件中,最终生成两个文件,file.out和file.out.index,其中包含了多段数据,每段数据对应一个分区。
** compress (可选)
map压缩阶段,对map merge阶段产生的文件进行压缩处理,以便于在后边的网络传输过程中减少网络IO压力,提升效率。
至此,map端的shuffle过程结束。
** sort merge
reduce任务会根据分区数据段拉取每个map任务产生的数据,拉取后,因为可能涉及到多个map产生的数据,所以要进行排序,一边copy一边排序,最后把多个map产生的具有相同分区的数据合并为一个分区数据段,这个merge过程和map的merge算法过程一样。
在此完成shuffle阶段
** reduce
对于本例而言,此时产生的某个分区中的某个单词形式大概如下:
<cat, [1, 1, 1, 1, 1, 1]>,在调用reduce方法时,进行values各个元素的叠加操作即可。
** output
统计完成后,输出数据到文件目录,文件格式为part-r-00000这样形式的文件,存放于HDFS中。文件中key和value默认的分隔符为:\t
** Hadoop集群部署
之前我们在yarn框架中运行mapreduce任务,或者操作hdfs,其中的各种节点都是运行在一台虚拟机上的,现在我们要将hadoop部署在一个多台虚拟机构成的完全分布式集群中(全部都在一个机器节点上的叫做伪分布式,比如之前的方式)。部署前,我们先勾画一下各个节点的部署结构,如下图所示:
描述:
3台机器共有进程:HDFS的datanode,yarn的nodemanager
其中,HDFS的namenode开在z01这台机器上,secondarynamenode开在z03这台机器上
YARN的resourcemanager开在z02这台机器上。
注:SecondaryNameNode是用来协助NameNode整合fsimage和edits的。
一、准备系统环境
1、修改主机名
# vi /etc/hostname
2、主机名和ip地址的映射
# vi /etc/hosts,我的机器修改如图,注意,三台机器都要这么设置:
3、关闭防火墙和selinux
请跳转至Linux基础04查看相关方法。
4、创建普通用户
# useradd 用户名,如果已经有普通用户,则无需再次创建
# echo 666666 | passwd --stdin 用户名
5、配置静态IP和DNS
请参看Linux基础01内容
6、把后面两个虚拟机的系统启动级别改成“字符模式”(就是没有桌面,这样可以减少虚拟机负担,加速系统启动和运行)
# cat /etc/inittab,内容如图所示:
根据文件中的提示,可以使用命令:
systemctl set-default multi-user.target,来设置无界面启动linux
systemctl set-default graphical.target,来设置有界面启动linux
7、卸载服务器JDK
请参看Linux基础02中的内容
二、配置NTP时间服务器
对于我们当前这种案例,主要目标是把z01这台服务器设置为时间服务器,剩下的z02,z03这两台机器同步z01的时间,我们需要这样做的原因是因为,整个集群架构中的时间,要保持一致。
** 检查当前系统时区,使用命令:
# date -R,如图:
注意这里,如果显示的时区不是+0800,你可以删除localtime文件夹后,再关联一个正确时区的链接过去,命令如下:
# rm -rf /etc/localtime
# ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
** 同步时间
# ntpdate pool.ntp.org
** 检查软件包
查看ntp软件包是否已安装,使用命令:
# rpm -qa | grep ntp,如图,红框中的内容:
如果没有红框中的内容,则可以使用命令:
# yum -y install ntp,来进行安装
** 修改ntp配置文件
# vi /etc/ntp.conf
去掉下面这行前面的# ,并把网段修改成自己的网段:
restrict 192.168.122.0 mask 255.255.255.0 nomodify notrap
注释掉以下几行:
#server 0.centos.pool.ntp.org
#server 1.centos.pool.ntp.org
#server 2.centos.pool.ntp.org
把下面两行前面的#号去掉,如果没有这两行内容,需要手动添加
server 127.127.1.0 # local clock
fudge 127.127.1.0 stratum 10
最后,如图所示:
** 重启ntp服务
# systemctl start ntpd.service,注意,如果是centOS7以下的版本,使用命令:service ntpd start
# systemctl enable ntpd.service,注意,如果是centOS7以下的版本,使用命令:chkconfig ntpd on
** z03,z03去同步z01这台时间服务器时间
首先需要关闭这两台计算机的ntp服务
# systemctl stop ntpd.service,centOS7以下,则:service ntpd stop
# systemctl disable ntpd.service,centOS7以下,则:chkconfig ntpd off
# systemctl status ntpd,查看ntp服务状态
# pgrep ntpd,查看ntp服务进程id
同步第一台服务器z01的时间:
# ntpdate z01,如图:
** 制定计划任务,周期性同步时间
# crontab -e
*/10 * * * * /usr/sbin/ntpdate z01,如图所示:
重启定时任务:
# systemctl restart crond.service,centOS7以下使用:service crond restart,z03这台机器的配置同理
三、配置无密钥登录
配置hadoop集群,首先需要配置集群中的各个主机的ssh无密钥访问
在z01上,通过如下命令,生成一对公私钥对
$ ssh-keygen -t rsa,一顿回车操作,这条命令执行完毕后(注意使用普通用户执行该命令),会在/home/z/.ssh/目录下生成两个文件:id_rsa 和 id_rsa.pub,如图所示:
生成之后呢,把z01生成的公钥拷贝给z01,z02,z03这三台机器,对,没错,包含当前机器。
$ ssh-copy-id z01
$ ssh-copy-id z02
$ ssh-copy-id z03
完成后,z02机器如图(z03同理):
以上完成了z01生成私钥,公钥并把公钥拷贝给z01,z02,z03三台机器的过程,z02,z03这两台机器也需要进行如上操作。全部完成后,我们可以在任意一台机器上,无密钥的连接到另外一台机器,比如,我们在z01连接到z02这台机器,使用命令:
$ ssh z02,如图:
这样就成功的在z01的机器登录到z02机器了。
四、安装配置JDK
使用root用户,在后面两台机器上创建/opt/modules文件夹,并使该文件夹的所属改为普通用户。
接着便可以使用远程命令scp,把已经在z01中安装好的jdk目录拷贝给另外两台机器。
$ scp -r /opt/modules/jdk1.7.0_67/ z02:/opt/modules/
$ scp -r /opt/modules/jdk1.7.0_67/ z03:/opt/modules/
注意中间有空格分开。配置完成后,记得去z02,z03修改/etc/profile环境变量
五、安装配置Hadoop
** 首先,需要先删除z01中的/opt/modules/hadoop-2.5.0/data目录,执行命令:
$ rm -rf /opt/modules/hadoop-2.5.0/data
** 在如下文件中,修改JAVA_HOME
hadoop-env.sh yarn-env.sh mapred-env.sh
export JAVA_HOME=/opt/modules/jdk1.8.0_121
** 修改HDFS默认地址、HDFS临时存储路径
涉及文件:core-site.xml
fs.defaultFS:hdfs://z01:8020
hadoop.tmp.dir:/opt/modules/hadoop-2.5.0/data
如图:
** 声明哪些服务器是datanode
涉及文件:slaves
z01
z02
z03
如图:
** 修改数据存放的副本数,SecondaryNameNode节点地址
涉及文件:hdfs-site.xml
dfs.replication:3
dfs.namenode.secondary.http-address:z03:50090
dfs.namenode.http-address:z01:50070
dfs.permissions.enabled:false
如图:
**resourcemanager节点配置,以及一些其他配置
涉及文件:yarn-site.xml
yarn.resourcemanager.hostname:z02
yarn.nodemanager.aux-services:mapreduce_shuffle
yarn.log-aggregation-enable:true
yarn.log-aggregation.retain-seconds:86400
如图:
** jobhistory服务以及其他设置
涉及文件:mapred-site.xml
mapreduce.framework.name:yarn
mapreduce.jobhistory.address:z01:10020
mapreduce.jobhistory.webapp.address:z01:19888
如图:
** 配置好后,拷贝hadoop安装目录给其他服务器
$ rm -rf /opt/modules/hadoop-2.5.0/share/doc/,删除该文档目录,以减少远程拷贝的体积
$ scp -r /opt/modules/hadoop-2.5.0/ z02:/opt/modules/
$ scp -r/opt/modules/ hadoop-2.5.0/ z03:/opt/modules/
全部搞定后,接下来我们就可以启动这个分布式系统了
六、启动Hadoop
** 在z01需要先格式化hdfs的namenode:
$ bin/hdfs namenode -format
** 使用start的脚本启动集群中所有的hdfs服务,包含namenode和datanode节点
$ sbin/start-dfs.sh
** 在z02中启动yarn服务,包含resourcemanager和nodemanager,注意,如果resourcemanger和namenode服务不在同一台机器上,那么启动resourcemanager服务必须在所在的机器启动,这里参看我们之前设定的集群配置图,所以需要在z02机器上执行如下命令:
$ sbin/start-yarn.sh
启动完成后,分别查看z01,z02,z03机器的jps,如下图:
z01:
z02:
z03:
在对比一下之前的集群配置图,是符合我们的期望的。
** 总结
本节主要深入讨论mapreduce的运算原理及过程,以及如何配置一个hadoop完全分布式集群。
IT全栈公众号:
QQ大数据技术交流群(广告勿入):476966007