大数据 - (二) - Hadoop框架
HADOOP框架
大数据技术解决的是什么问题?
- 解决海量数据的存储和计算
Hadoop的广义和狭义之分
- 狭义的Hadoop:
- Hadoop框架,三部分组成:
- HDFS:分布式文件系统,解决存储问题
- MapReduce:分布式离线计算框架,解决计算问题
- Yarn:资源调度框架
- Hadoop框架,三部分组成:
- 广义的Hadoop:生态圈,包括Hadoop框架以及辅助框架
- Flume:日志采集
- Sqoop:关系型数据库数据的采集,导出
- Hive:深度依赖Hadoop框架完成计算
- Hbase:大数据领域的数据库
- Kafka:高吞吐消息中间件
大数据简介
大数据定义
- 指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合
- 需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产
大数据特点
大量(Volume)
- 1PB = 1024TB
- 1EB = 1024PB
- 1ZB = 1024EB
高速(Velocity)
- 数据的创建、存储、分析都要求被高速处理
- 实时完成个性化推荐
多样(Variety)
- 数据形式和来源多样化
真实(Veracity)
- 确保数据的真实性,才能保证数据分析的正确性
低价值(Value)
- 数据价值密度相对较低
- 结合业务逻辑并通过强大的机器算法来挖掘数据价值
大数据应用场景
- 仓储物流
- 电商零售
- 个性推荐
- 汽车-无人驾驶
- 人工智能
Hadoop简介
Hadoop定义
Hadoop 是一个适合大数据的分布式存储和计算平台。
Hadoop的发行版本
- Apache Hadoop
- 0.x 系列版本:Hadoop当中最早的一个开源版本,在此基础上演变而来的1.x以及2.x的版本
- 1.x 版本系列:Hadoop版本当中的第二代开源版本,主要修复0.x版本的一些bug等
- 2.x 版本系列:架构产生重大变化,引入了yarn平台等许多新特性
- 3.x 版本系列:EC技术、YARN的时间轴服务等新特性
- CDH版本
- HDP版本
Hadoop 优缺点
优点
- Hadoop具有存储和处理数据能力的高可靠性
- 通过可用的计算机集群分配数据,完成存储和计算任务,这些集群可以方便地扩展到数以
- 能够在节点之间进行动态地移动数据,并保证各个节点的动态平衡,处理速度非常快,具有高效性
- 能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配,具有高容错性。
缺点
-
Hadoop
不适用于低延迟数据访问。 -
Hadoop
不能高效存储大量小文件。 -
Hadoop
不支持多用户写入并任意修改文件
Apache Hadoop 重要组成
Hadoop=HDFS(分布式文件系统)+MapReduce(分布式计算框架)+Yarn(资源协调框架)+Common模块
Hadoop HDFS
一个高可靠、高吞吐量的分布式文件系统,Hadoop Distribute File System
-
分而治之,把一个很大的数据存储进行拆分,数据切割成数据块,由节点分别存储
image.png - HDFS是Master/Slave架构:
- 存储过程:把一个很大的数据存储进行拆分,数据切割成数据块
- 获取过程:向
NameNode
请求,获取到之前存入文件的块以及块所在的DateNode
信息,分别下载,合并
- NameNode(nn):存储文件的元数据,元数据记录了文件的块列表以及块所在
DateNode
节点信息 - SecondaryNameNode(2nn):辅助NameNode更好的工作,用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据快照
- DateNode(dn):在本地文件系统存储文件块数据,以及块数据的校验
注意:NN,2NN,DN这些既是角色名称,进程名称,代指电脑节点名称
Hadoop MapReduce
一个分布式的离线并行计算框架
- 拆解任务、分散处理、汇整结果
-
MapReduce
计算 =Map
阶段 +Reduce
阶段-
Map
阶段就是“分”的阶段,并行处理输入数据; -
Reduce
阶段就是“合”的阶段,对Map
阶段结果进行汇总
-
-
每个节点负责一个切片工作,
image.png
Hadoop YARN
作业调度与集群资源管理的框架
image.png
- ResourceManager(rm):处理客户端请求、启动/监控
ApplicationMaster
、监控NodeManager
、资源分配与调度 - NodeManager(nm):单个节点上的资源管理、处理来自
ResourceManager
的命令、处理来自ApplicationMaster
的命令; - ApplicationMaster(am):数据切分、为应用程序申请资源,并分配给内部任务、任务监控与容错
- Container:对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关的信息。
Hadoop Common
支持其他模块的工具模块(Configuration、RPC、序列化机制、日志操作)
Apache Hadoop 完全分布式集群搭建
- Hadoop框架是
Java
编写,需要JVM
环境 - JDK版本:JDK8
- Hadoop搭建方式:
- 单机模式:单节点模式,非集群,生产不会使用这种方式
- 单机伪分布式模式:单节点,多线程模拟集群的效果,生产不会使用这种方式
- 完全分布式模式:多台节点,真正的分布式Hadoop集群的搭建(生产环境建议使用这种方式)
集群规划
框架 | os1 | os2 | os3 |
---|---|---|---|
HDFS | NameNode、DataNode | DataNode | SecondaryNameNode、DataNode |
YARN | NodeManager | ResourceManager、NodeManager | NodeManager |
Hadoop目录
├── bin # 对Hadoop进行操作的相关命令,如hadoop,hdfs等
├── etc # Hadoop的配置文件目录,入hdfs-site.xml,core-site.xml等
├── include
├── lib # Hadoop本地库(解压缩的依赖)
├── libexec
├── LICENSE.txt
├── NOTICE.txt
├── README.txt
├── sbin # 存放的是Hadoop集群启动停止相关脚本,命令
└── share # Hadoop的一些jar,官方案例jar,文档等
集群配置[1]
Hadoop集群配置 = HDFS集群配置 + MapReduce集群配置 + Yarn集群配置
HDFS集群配置
- 配置文件在
hadoop/etc/hadoop
下 - 配置xml时,配置内容放在
<configuration></configuration>
中 - 都先在os1上配置
- 将JDK路径明确配置给HDFS(修改hadoop-env.sh)
vi hadoop-env.sh
export JAVA_HOME=$JAVA_HOME
- 指定NameNode节点以及数据存储目录(修改core-site.xml)
vi core-site.xml
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://os1:9000</value>
</property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/servers/hadoop-2.9.2/data/tmp</value>
</property>
- 指定SecondaryNameNode节点(修改hdfs-site.xml)
vi hdfs-site.xml
<!-- 指定Hadoop辅助名称节点主机配置 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>os3:50090</value>
</property>
<!--副本数量 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
- 指定DataNode从节点(修改etc/hadoop/slaves文件,每个节点配置信息占一行)
vi slaves
os1
os2
os3
MapReduce集群配置
- 将JDK路径明确配置给MapReduce(修改mapred-env.sh)
vi mapred-env.sh
export JAVA_HOME=$JAVA_HOME
- 指定MapReduce计算框架运行Yarn资源调度框架(修改mapred-site.xml)
mv mapred-site.xml.template mapred-site.xml
vi mapred-site.xml
<!-- 指定MR运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
Yarn集群配置
- 将JDK路径明确配置给Yarn(修改yarn-env.sh)
vi yarn-env.sh
export JAVA_HOME=$JAVA_HOME
- 指定ResourceManager的Master节点所在计算机节点(修改yarn-site.xml)
vi yarn-site.xml
<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>os3</value>
</property>
<!-- Reducer获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
- 指定NodeManager节点(会通过slaves文件内容确定)
- 配置
slave
分发配置
使用rsync
远程同步工具
yum install -y rsync
rsync语法
rsync -rvl $pdir/$fname $user@$host:$pdir/$fname
## -r 递归
## -v 显示复制过程
## -l 拷贝符号连接
rsync-script自动化脚本
#!/bin/bash
#1 获取命令输入参数的个数,如果个数为0,直接退出命令
paramnum=$#
if((paramnum==0)); then
echo no params;
exit;
fi
#2 根据传入参数获取文件名称
p1=$1
file_name=`basename $p1`
echo fname=$file_name
#3 获取输入参数的绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
#4 获取用户名称
user=`whoami`
#5 循环执行rsync
for((host=2; host<4; host++)); do
echo ------------------- os$host --------------
rsync -rvl $pdir/$file_name $user@os$host:$pdir
done
使用:rsync-script /opt/servers/hadoop-2.9.2
启动集群
注意:
如果集群是第一次启动,需要在Namenode所在节点格式化NameNode,非第一次不用执行格式化Namenode操作
hadoop namenode -format
单节点启动
- 在os1上启动
NameNode
hadoop-daemon.sh start namenode
- 在os1,os2,os3上启动
DataNode
hadoop-daemon.sh start datanode
Yarn集群单节点启动
- 在os3上启动
ResourceManager
yarn-daemon.sh start resourcemanager
- 在os1,os2,os3启动
NodeManager
yarn-daemon.sh start nodemanager
集群群起
- os1启动dfsh
start-dfs.sh
- os3启动yarn
start-yarn.sh
集群测试
HDFS 分布式存储
- os1上传文件,os2下载
- 使用网页查看:{os1-IP}:50070/explorer.html#/test/input
# 创建hdfs目录
hdfs dfs -mkdir -p /test/input
cd /root
vi test.txt
# os1上传
hdfs dfs -put /root/test.txt /test/input
# os2下载
hdfs dfs -get /root/test.txt /test/input
MapReduce 分布式计算
- 在
HDFS
文件系统根目录下面创建一个wcinput
文件夹
hdfs dfs -mkdir /wcinput
- 本地
/root
下创建一个wc.txt
文件
cd /root
vi wc.tx
# 文件中输入
hadoop mapreduce yarn
hdfs hadoop mapreduce
mapreduce yarn os
os
os
- 上传
wc.txt
到HDFS
目录/wcinput
下
hdfs dfs -put wc.txt /wcinput
- 使用
hadoop jar
执行wordcount
方法
hadoop jar $HADOOP_PATH/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.2.jar wordcount /wcinput
/wcoutput
- 查看结果
hdfs dfs -cat /wcoutput/part-r-00000
- 使用网页查看
{os3-IP}:8088/cluster
配置历史服务器
- 配置
mapred-site.xml
# vi mapred-site.xml
<property>
<name>mapreduce.jobhistory.address</name>
<value>os1:10020</value>
</property>
<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>os1:19888</value>
</property>
- 分发到其他节点
rsync-script mapred-site.xml
- 启动历史服务器
mr-jobhistory-daemon.sh start historyserver
- 查看JobHistory
http://os1:19888/jobhistory
配置日志的聚集
日志聚集
应用(Job)运行完成以后,将应用运行日志信息从各个task汇总上传到HDFS系统上
- 优势:可以方便的查看到程序运行详情,方便开发调试。
- 注意:开启日志聚集功能,需要重新启动
NodeManager
、ResourceManager
和HistoryManager
。
日志聚集开启步骤
- 配置
yarn-site.xml
# vi yarn-site.xml
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
- 分发
yarn-site.xml
到集群其它节点
rsync-script yarn-site.xml
- 关闭
NodeManager
、ResourceManager
和HistoryManager
# os3中
stop-yarn.sh
# os1
mr-jobhistory-daemon.sh stop historyserver
- 启动
NodeManager
、ResourceManager
和HistoryManager
# os3
start-yarn.sh
mr-jobhistory-daemon.sh start historyserver
- 删除
HDFS
上已经存在的输出文件
hdfs dfs -rm -R /wcoutput
- 执行
WordCount
程序
hadoop jar $HADOOP_PATH/share/hadoop/mapreduce/hadoopmapreduce-examples-2.9.2.jar wordcount /wcinput /wcoutput
HDFS分布式文件系统
HDFS简介
HDFS (全称:Hadoop Distribute File System,Hadoop 分布式文件系统)是 Hadoop 核心组成,是分布式存储服务。
-
HDFS
是分布式文件系统中的一种
HDFS的重要概念
-
HDFS
通过统一的命名空间目录树来定位文件 - 是分布式的,集群中的服务器有各自的角色
典型的 Master/Slave 架构
-
HDFS
集群往往是一个NameNode
(HA架构会有两个NameNode,联邦机制)+多个DataNode
组成; -
NameNode
是集群的主节点,DataNode
是集群的从节点。
分块存储(block机制)
-
HDFS
中的文件在物理上是分块存储(block
) - 块的大小可以通过配置参数来规定,默认
128M
,自动切分
命名空间(NameSpace)
-
HDFS
支持传统的层次型文件组织结构。 - 用户或者应用程序可以创建目录,然后将文件保存在这些目录里
- 文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件
-
Namenode
负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将会被记录下来 -
HDFS
提供给客户单一个抽象目录树,访问形式:hdfs://namenode
的hostname:port/test/input
,例如:hdfs://os1:9000/test/input
NameNode元数据管理
- 我们把目录结构及文件分块位置信息叫做元数据。
-
NameNode
的元数据记录每一个文件所对应的block
信息(block
的id
,以及所在的DataNode
节点的信息)
DataNode数据存储
文件的各个block
的具体存储管理由 DataNode 节点承担。
一个block会有多个DataNode
来存储,DataNode
会定时向NameNode来汇报自己持有的block信息
副本机制
为了容错,文件的所有block
都会有副本。
每个文件的block
大小和副本系数都是可配置的。
应用程序可以指定某个文件的副本数目。
副本系数可以在文件创建的时候指定,也可以在之后改变。
副本数量默认是3
个
一次写入,多次读出
-
HDFS
是设计成适应一次写入,多次读出的场景,且不支持文件的随机修改。 (支持追加写入,不只支持随机更新) -
HDFS
适合用来做大数据分析的底层存储服务,并不适合用来做网盘等应用(修改不方便,延迟大,网络开销大,成本太高)
HDFS 架构
image.pngNameNode(nn):HDFS集群的管理者,Master
- 维护管理
HDFS的
命名空间(NameSpace
) - 维护副本策略
- 记录文件块(
Block
)的映射信息 - 负责处理客户端读写请求
DateNode:Slave节点
- 保存实际的数据块
- 负责数据块的读写
Client:客户端
- 上传文件到
HDFS
的时候,Client
负责将文件切分成Block
,然后进行上传
请求·NameNode·交互,获取文件的位置信息 - 读取或写入文件,与
DataNode
交互 -
Client
可以使用一些命令来管理HDFS
或者访问HDFS
HDFS 客户端操作
Shell 命令行操作HDFS
基本语法
# hadoop fs [具体命令]
# 查看命令帮助
hadoop fs -help rm
# 显示目录信息
hadoop fs -ls /
# 在HDFS上创建目录
hadoop fs -mkdir -p /study/bigdata
# 从本地剪切粘贴到HDFS
cat > test1.txt << EOF
1
2
EOF
hadoop fs -moveFromLocal test1.txt /study/bigdata
# 追加一个文件到已经存在的文件末尾
cat > test2.txt << EOF
3
4
EOF
hadoop fs -appendToFile test2.txt /study/bigdata/test1.txt
# 显示文件内容
hadoop fs -cat /study/bigdata/test1.txt
# 统计文件夹的大小信息
hadoop fs -du -s -h /study/bigdata
# 设置HDFS中文件的副本数量,实际副本数量取决于datanode数量
hadoop fs -setrep 10 /study/bigdata/test1.txt
HDFS读写解析
image.png- 客户端通过
Distributed FileSystem
向NameNode
请求下载文件,NameNode
通过查询元数据,找到文件块所在的DataNode
地址。 - 挑选一台
DataNode
(就近原则,然后随机)服务器,请求读取数据。 -
DataNode
开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet
为单位来做校验)。 - 客户端以
Packet
为单位接收,先在本地缓存,然后写入目标文件
HDFS写数据流程
image.png- 客户端通过
Distributed FileSystem
模块向NameNode请求上传文件,NameNode
检查目标文件是否已存在,父目录是否存在。 -
NameNode
返回是否可以上传。 - 客户端请求第一个
Block
上传到哪几个DataNode
服务器上。 -
NameNode
返回3
个DataNode
节点,分别为dn1
、dn2
、dn3
。 - 客户端通过
FSDataOutputStream
模块请求dn1
上传数据,dn1
收到请求会继续调用dn2
,然后dn2
调用dn3
,将这个通信管道建立完成。 -
dn1
、dn2
、dn3
逐级应答客户端。 - 客户端开始往
dn1
上传第一个Block
(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1
收到一个Packet
就会传给dn2
,dn2
传给dn3
;dn1
每传一个packet会放入一个确认队列等待确认。 - 当一个
Block
传输完成之后,客户端再次请求NameNode
上传第二个Block
的服务器。(重复执行3-7步)
NN与2NN
HDFS元数据管理机制
NameNode如何管理和存储元数据?
- 计算机中存储数据2种:内存或者磁盘
- 如果元数据存储磁盘:存储磁盘无法面对客户端对元数据信息的任意的快速低延迟的响应,但是安全性高
- 如果元数据存储内存:可以高效的查询以及快速响应客户端的查询请求,数据保存在内存,如果断点,内存中的数据全部丢失。
- 解决方案:内存+磁盘,
NameNode
内存+FsImage
的文件(磁盘)
磁盘和内存中元数据如何划分?
- 如果一模一样:
client
如果对元数据进行增删改操作,需要保证两个数据的一致性。FsImage
文件操作起来效率也不高
解决方案:两个合并成完整数据,NameNode
引入了一个edits
文件(日志文件:只能追加写入)edits
文件记录的是client
的增删改操作
元数据管理流程图
image.png- 特点:编辑日志生成快,fsimage恢复快、生成慢
- 2NN辅助NN管理维护元数据,帮助生成fsimage
- 第一阶段:
NameNode
启动- 第一次启动
NameNode
格式化后,创建Fsimage
和Edits
文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。 - 客户端对元数据进行增删改的请求。
-
NameNode
记录操作日志,更新滚动日志。 -
NameNode
在内存中对数据进行增删改。
- 第一次启动
- 第二阶段:
Secondary NameNode
工作-
Secondary NameNode
询问NameNode
是否需要CheckPoint
(检查点)。直接带回NameNode
是否执行检查点操作结果。 -
Secondary NameNode
请求执行CheckPoint
。 -
NameNode
滚动正在写的Edits
日志。 - 将滚动前的编辑日志和镜像文件拷贝到
Secondary NameNode
。 -
Secondary NameNode
加载编辑日志和镜像文件到内存,并合并。 - 生成新的镜像文件
fsimage.chkpoint
。 - 拷贝
fsimage.chkpoin
t到NameNode
。 -
NameNode
将fsimage.chkpoint
重新命名成fsimage
-
fsimage与Edits文件解析
- 当使用
NameNode
格式化之后,在目录hadoop
工作目录/opt/servers/hadoop-2.9.2/data/tmp/dfs/name/current
下生成一系列文件
image.png - seen_txid:记录编辑日志最大滚动编号,用于
NN
管理edits
文件,防止edits
文件丢失后导致元数据错误 - VERSION:记录namenode的一些版本号信息,如
CusterId(当前集群唯一标识)
、namespaceID
(命名空间id)、blookpoolID(块池id)
等 - edits_xxx_*文件:编辑日志滚动后生成
- fsimage文件:
namenode
中关于元数据的镜像,一般称为检查点,这里包含了HDFS
文件系统所有目录以及文件相关信息(Block
数量,副本数量,权限等信息)
fsimage文件内容
查看oiv命令
hdfs oiv -p 文件类型 -i 镜像文件 -o 转换后文件输出路径
hdfs oiv -p XML -i fsimage_0000000000000000265 -o
/opt/lagou/servers/fsimage.xml
- 为什么fimage没有记录
datenode
的信息?- 在内存元数据中记录
datanode
信息 - 集群启动时,加载fsimage和edits文件,都没有块对应的
datanode
信息 - 集群启动时有安全模式,安全模式会让
datanode
汇报自己的持有的块信息,不全元数据
- 在内存元数据中记录
Edits文件内容‘
查看oev命令
hdfs oev -p 文件类型 -i编辑日志 -o 转换后文件输出路径
-
edits
中只记录了更新相关的操作,查询或者下载文件并不会记录在内 -
NN
启动时,加载fsimage
和未经合并的edits
文件,NN
通过判断fsimage后编号过滤掉之前已经合并的edits
文件(编号小于等于的文件)
checkpoint周期
默认设置:hdfs-default.xml
<!-- 定时一小时 -->
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
<!-- 一分钟检查一次操作次数,3当操作次数达到1百万时,SecondaryNameNode执行一次 -->
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>操作动作次数</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description> 1分钟检查一次操作次数</description>
</property >
NN故障处理
- 如果元数据出现丢失损坏如何恢复呢?
- 将
2NN
的元数据拷贝到NN
的节点下,此种方式会存在元数据的丢失。 - 搭建
HDFS
的HA
(高可用)集群,解决NN的单点故障问题!!(借助Zookeeper
实现HA
,一个Active
的NameNode
,一个是Standby
的NameNode
)
Hadoop的限额与归档以及集群安全模式
HDFS文件限额配置
数量限额
#创建hdfs文件夹
hdfs dfs -mkdir -p /user/root/test_setting
# 给该文件夹下面设置最多上传两个文件,上传文件,发现只能上传一个文件
hdfs dfsadmin -setQuota 2 /user/root/test_setting
# 清除文件数量限制
hdfs dfsadmin -clrQuota /user/root/test_setting
空间大小限额
# 限制空间大小4KB
hdfs dfsadmin -setSpaceQuota 4k /user/root/test_setting
#上传超过4Kb的文件大小上去提示文件超过限额
hdfs dfs -put /export/softwares/xxx.tar.gz /user/root/test_setting
#清除空间限额
hdfs dfsadmin -clrSpaceQuota /user/root/test_setting
#查看hdfs文件限额数量
hdfs dfs -count -q -h /user/root/test_setting
MapReduce编程框架
MapReduce思想
- 解决大数据量的任务计算、任务处理
-
MapReduce
核心思想:大数量计算也是"分而治之",充分利用了并行处理的优势 -
MapReduce
任务过程是分为两个处理阶段:-
Map
阶段:Map阶段的主要作用是“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。Map
阶段的这些任务可以并行计算,彼此间没有依赖关系。-
Map
阶段,编程人员只需关注业务逻辑,无需关心原始数据切分,读取问题,以及任务调度、分配问题;
-
-
Reduce
阶段:主要作用是“合”,即对Map
阶段的结果进行全局汇总。
-
官方WordCount案例源码解析
* `Reduce`阶段,编程人员无需关心如何获取结果,只需要关心如何汇总数据
官方WordCount源码分析
- 反编译
hadoop-mapreduce-examples-2.9.2.jar
,WordCount.class
下WordCount
方法 -
TokenizerMapper
类,对应Map阶段
image.png -
IntSumReducer
类,对应Reduce阶段
image.png - 运行作业
Main
方法
image.png
MapReduce计算类基本组成
- Mapper类
- Reducer类
- 运行作业的代码(Driver)
Hadoop序列化
为什么进行序列化?
- 序列化主要是我们通过网络通信传输数据时或者把对象持久化到文件,需要把对象序列化成二进制的结构
为什么Hadoop
要选择建立自己的序列化格式而不使用java
自带serializable
?
- 序列化在分布式程序中非常重要,在Hadoop中,集群中多个节点的进程间的通信是通过
RPC
(远程过程调用:Remote Procedure Call
)实现;RPC
将消息序列化成二进制流发送到远程节点,远程节点再将接收到的二进制数据反序列化为原始的消息,因此RPC往往追求如下特点:- 紧凑:数据更紧凑,能充分利用网络带宽资源
- 快速:序列化和反序列化的性能开销更低
-
Hadoop
使用的是自己的序列化格式Writable
,它比java
的序列化serialization
更紧凑速度更快。一个对象使用Serializable
序列化后,会携带很多额外信息比如校验信息,Header
,继承体系等。
MapReduce编程规范及示例编写
Mapper类
- 用户自定义一个
Mapper
类继承Hadoop
的Mapper
类 -
Mapper
的输入数据是KV
对的形式(类型可以自定义) -
Map
阶段的业务逻辑定义在map()
方法中 -
Mapper
的输出数据是KV
对的形式(类型可以自定义)
注意:
map()
方法是对输入的一个KV
对调用一次
Reducer类
- 用户自定义
Reducer
类要继承Hadoop
的Reducer
类 -
Reducer
的输入数据类型对应Mapper
的输出数据类型(KV
对) -
Reducer
的业务逻辑写在reduce()
方法中
注意:
Reduce()
方法是对相同K
的一组KV
对调用执行一次
Driver阶段
- 创建提交
YARN
集群运行的Job
对象 - 其中封装了
MapReduce
程序运行所需要的相关参数入输入数据路径,输出数据路径等,也相当于是一个YARN
集群的客户端 - 主要作用就是提交我们MapReduce程序运行。
------ 跳过
MapReduce原理分析
MapTask运行机制详解
-
沙盒机制
image.png - 首先,读取数据组件
InputFormat
(默认TextInputFormat
)会通过getSplits
方法对输入文件进行逻辑切片规划得到splits
,有多少个split
就对应启动多少个MapTask
。spli
t与block
的对应关系默认是一对一;
- 首先,读取数据组件
- 将输入文件切分为
splits
之后,由RecordReader
对象(默认LineRecordReader
)进行读取,以\n
作为分隔符,读取一行数据,返回<key,value>
。Key
表示每行首字符偏移值,value
表示这一行文本内容;
- 将输入文件切分为
- 读取
split
返回<key,value>
,进入用户自己继承的Mapper
类中,执行用户重写的map
函数。RecordReader
读取一行这里调用一次
- 读取
-
map
逻辑完之后,将map
的每条结果通过context.write
进行collect
数据收集。在collect
中,会先对其进行分区处理,默认使用HashPartitioner
。
-
-
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集
map
结果,减少磁盘IO
的影响。我们的key/value
对以及Partition
的结果都会被写入缓冲区。当然写入之前,key
与value
值都会被序列化成字节数组。
- 环形缓冲区其实是一个数组,数组中存放着
key
、value
的序列化数据和key
、value
的元数据信息,包括partition
、key
的起始位置、value
的起始位置以及value
的长度。环形结构是一个抽象概念。 - 缓冲区是有大小限制,默认是
100MB
。当map task
的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill
,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map
结果的线程。溢写线程启动时不应该阻止map
的结果输出,所以整个缓冲区有个溢写的比例spill.percent
。这个比例默认是0.8
,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB
),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task
的输出结果还可以往剩下的20MB内存中写,互不影响
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集
-
- 当溢写线程启动后,需要对这80MB空间内的
key
做排序(Sort
)。排序是MapReduce
模型默认的行为,排序算法使用quicksort
- 如果
job
设置过Combiner
,这时会对所有相同key
的key/value
对的value
加起来,减少溢写到磁盘的数据量。Combiner
会优化MapReduce
的中间结果,所以它在整个模型中会多次使用 -
Combiner
的输出是Reducer
的输入,Combiner
绝不能改变最终的计算结果,Combiner
只应该用于那种Reduce
的输入key/value
与输出key/value
类型完全一致,且不影响最终结果的场景
- 当溢写线程启动后,需要对这80MB空间内的
- 合并溢写文件,每次溢写会在磁盘上生成一个临时文件,当整个数据处理结束之后开始对磁盘中的临时文件进行
merge
合并,因为最终的文件只有一个,合并后进行归并排序,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce
对应数据的偏移量
- 合并溢写文件,每次溢写会在磁盘上生成一个临时文件,当整个数据处理结束之后开始对磁盘中的临时文件进行
注意:
Map
阶段的所有排序都是对key
进行的
进入缓冲区,按key
的hashcode
进行分区
MapTask的并行度
-
MapTask
的并行度决定Map
阶段的任务处理并发度,从而影响到整个Job
的处理速度
思考:
MapTask
并行任务是否越多越好呢?
答:源码中,默认切片大小128M,但是切片如果在10%增幅下,也当成一个切片;因为MR并行度越高消耗资源也越高,同样的,block是否也会有同样算法?- 哪些因素影响了
MapTask
并行度?
MapTask并行度决定机制
- 数据块:Block是HDFS物理上把数据分成一块一块。
- 切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片
进行存储 - 切片大小
切片大小128 = block块大小
- 问题:
a文件300M,b文件100M,2个文件都存入HDFS,并作为某MR输入数据,需要对MR任务进行split,请问MapTask并行度是多少?
a文件:0-127M,128-256M,256-300M
b文件:0-100M
总共4个split,MapTask并行度为4
- 注意:
要移动计算,不要移动数据,移动数据代价高
ReduceTask工作机制
image.png-
Reduce
大致分为copy
、sort
、reduce
三个阶段。 -
copy
阶段包含一个eventFetcher
来获取已完成的map
列表,由Fetcher
线程去copy
数据- 在此过程中会启动两个
merge
线程,分别为inMemoryMerger
和onDiskMerger
,分别将内存中的数据merge
到磁盘和将磁盘中的数据进行merge
。
- 在此过程中会启动两个
- 待数据
copy
完成之后,开始进行sort阶段,sort阶段主要是执行
finalMerge
操作, - 完成之后就是
reduce
阶段,调用用户定义的reduce
函数进行处理。
ReduceTask详细步骤
-
Copy
阶段,简单地拉取数据。-
Reduce
进程启动一些数据copy
线程(Fetcher
) - 通过
HTTP
方式请求maptask
获取属于自己的文件。
-
-
Merge
阶段。- 这里的
merge
如map
端的merge
动作,只是数组中存放的是不同map
端copy来的数值。 -
Copy
过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map
端的更为灵活。 - merge有三种形式:
- 内存到内存;
- 默认不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的
merge
。与map
端类似,这也是溢写的过程,这个过程中如果你设置有Combiner
,也是会启用的,然后在磁盘中生成了众多的溢写文件。
- 默认不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的
- 内存到磁盘;
- 磁盘到磁盘。
- 第二种
merge
方式一直在运行,直到没有map
端的数据时才结束,然后启动第三种磁盘到磁盘的merge
方式生成最终的文件。
- 第二种
- 内存到内存;
- 合并排序
- 把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
- 对排序后的键值对调用
reduce
方法,键相等的键值对调用一次reduce
方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS
文件中
- 这里的
ReduceTask并行度
MapTask
并行度取决于split
数量,split
大小默认blocksize
-
ReduceTask
的并行度同样影响整个Job
的执行并发度和执行效率,ReduceTask
数量的决定是可以直接手动设置:
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);
- 注意:
-
ReduceTask=0
,表示没有Reduce
阶段,输出文件数和MapTask
数量保持一致; -
ReduceTask
数量不设置默认就是一个,输出文件数量为1个; - 如果数据分布不均匀,可能在
Reduce
阶段产生数据倾斜;
数据倾斜 —— 某个ReduceTask处理的数据量远远大于其他节点
-
Shuffle机制
map
阶段处理的数据如何传递给reduce
阶段,是MapReduce
框架中最关键的一个流程,这个流程就叫shuffle
,
*MapTask
的map
之后,到ReduceTask
的reduce
之前的数据处理过程称为Shuffle
流程
- shuffle:洗牌、发牌——(核心机制:数据分区,排序,分组,
combine
,合并等过程)
MapReduce的分区与reduceTask的数量
- 在
MapReduce
中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce
当中进行处理(默认是key
相同去往同个分区),例如我们为了数据的统计,我们可以把一批类似的数据发送到同一个reduce
当中去,在同一个reduce
当中统计相同类型的数据 - 如何才能保证相同
key
的数据去往同个reduce
呢?只需要保证相同key
的数据分发到同个分区即可。 - MR程序shuffle机制默认就是这种规则,
MR
程序默认使用的HashPartitioner
,保证了相同的key
去往同个分区
YARN资源调度
主要角色
-
ResourceManager(rm)
:处理客户端请求、启动/监控ApplicationMaster
、监控NodeManager
、资源分配与调度; -
NodeManager(nm)
:单个节点上的资源管理、处理来自ResourceManager
的命令、处理来自ApplicationMaster
的命令; -
ApplicationMaster(am)
:数据切分、为应用程序申请资源,并分配给内部任务、任务监控与容错。 -
Container
:对任务运行环境的抽象,封装了CPU
、内存等多维资源以及环境变量、启动命令等任务运行相关的信息。
Yarn任务提交(工作机制)
image.png作业提交过程之YARN
- 作业提交
- 第1步:
Client
调用job.waitForCompletion
方法,向整个集群提交MapReduce
作业。 - 第2步:
Client
向RM
申请一个作业id
。 - 第3步:
RM
给Client
返回该job
资源的提交路径和作业id
。 - 第4步:
Client
提交jar
包、切片信息和配置文件到指定的资源提交路径。 - 第5步:
Client
提交完资源后,向RM申请运行MrAppMaster
。
- 第1步:
- 作业初始化
- 第6步:当
RM
收到Client
的请求后,将该job
添加到容量调度器中。 - 第7步:某一个空闲的
NM
领取到该Job
。 - 第8步:该
NM
创建Container
,并产生MRAppmaster
。 - 第9步:下载
Client
提交的资源到本地。
- 第6步:当
- 任务分配
- 第10步:
MrAppMaster
向RM
申请运行多个MapTask
任务资源。 - 第11步:
RM
将运行MapTask
任务分配给另外两个NodeManager
,另两个NodeManager
分别领取任务并创建容器。
- 第10步:
- 任务运行
- 第12步:
MR
向两个接收到任务的NodeManager
发送程序启动脚本,这两个NodeManager
分别启动MapTask
,MapTask
对数据分区排序。 - 第13步:
MrAppMaster
等待所有MapTask
运行完毕后,向RM
申请容器,运行ReduceTask
。 - 第14步:
ReduceTask
向MapTask
获取相应分区的数据。 - 第15步:程序运行完毕后,
MR
会向RM
申请注销自己。
- 第12步:
- 进度和状态更新
-
YARN
中的任务将其进度和状态返回给应用管理器, 客户端每秒(通过
mapreduce.client.progressmonitor.pollinterval
设置)向应用管理器请求进度更新, 展示给用户。
-
- 作业完成
- 除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用
waitForCompletion()
来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval
来设置。 - 作业完成之后, 应用管理器和
Container
会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
- 除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用
Yarn调度策略
- Hadoop作业调度器主要有三种:
-
FIFO
:按照任务到达的时间顺序 -
Capacity Scheduler
(2.9.2
版本默认):-
Capacity
调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力; - 通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源
- 在一个队列内部,资源的调度是采用的是先进先出
-
-
Fair Scheduler
(CDH
版本默认使用的调度器)
-
-
集群配置 ↩