程序员Hadoop

大数据学习day_5

2017-07-23  本文已影响0人  Sakura_P

思考问题

MapReduce总结

MapReduce

  1. MapReduce的定义
    MapReduce是一种编程模型, 用于大规模数据集(大于1TB)的并行运算。它将分布式磁盘读写的问题进行抽象,并转换为对一个数据集(由键/值对组成)的计算,该计算由 mapreduce 两部分组成。

  2. MapReduce编程模型流行的三个技术方面的原因:

  1. MapReduce和关系型数据库的比较



    MapReduce和关系型数据库之间的另一个区别在于他们所操作的数据集的结构化的程度,这在第一篇的文章已经讨论过。
    需要强调的是,MapReduce输入的键和值并不是数据固有的属性,而是由分析数据的人员来选择的。

  2. MapReduce的特点:

  1. Map 和 Reduce
    Hadoop框架使用Mapper将数据处理成一个个的<key,value>键值对,在网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。

MapReduce的工作机制

MapReduce运行图解1 MapReduce运行图解2

下面,我们来剖析MapReduce作业的运行机制

  1. 作业的提交:客户端通过JobClient.runJob()来提交一个作业到jobtracker,JobClient程序逻辑如下:

简单来说呢,就是

  1. 作业的初始化

简单来说呢,就是

  1. 任务的分配
  1. 任务的执行
  1. 进度和状态的更新
  1. 作业的完成
  1. 作业的失败

MapReduce工作涉及到的4个对象

  1. 客户端(client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作。
  2. JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行。
  3. TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个就和hdfs里namenode一样存在单点故障,我会在后面的mapreduce的相关问题里讲到这个问题的)。
  4. HDFS:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面
    <small>jobtracker的单点故障:
    jobtracker和hdfs的namenode一样也存在单点故障,
    单点故障一直是hadoop被人诟病的大问题,
    为什么hadoop的做的文件系统和mapreduce计算框架都是高容错的,但是最重要的管理节点的故障机制却如此不好,我认为主要是namenode和jobtracker在实际运行中都是在内存操作,而做到内存的容错就比较复杂了,只有当内存数据被持久化后容错才好做,namenode和jobtracker都可以备份自己持久化的文件,但是这个持久化都会有延迟,因此真的出故障,任然不能整体恢复,另外hadoop框架里包含zookeeper框架,zookeeper可以结合jobtracker,用几台机器同时部署jobtracker,保证一台出故障,有一台马上能补充上,不过这种方式也没法恢复正在跑的mapreduce任务。
    </small>

MapRedece作业的处理流程

MapRedece作业的处理流程图解

按照时间顺序包括:

  1. 输入分片(input split):
    在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务
    输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切
    <small>假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split)
    即我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。

    </small>
  2. Map阶段:
    程序员编写好的map函数了,因此map函数效率相对好控制,而且一般map操作都是本地化操作也就是在数据存储节点上进行。
  3. Combiner阶段:
    combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作。
    Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作。
    <small>例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,
    例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。
    </small>
  4. shuffle阶段:
    将map的输出作为reduce的输入的过程就是shuffle了。
  5. reduce阶段:
    和map函数一样也是程序员编写的,最终结果是存储在hdfs上的。

Combiner深入理解

MapReduce流程

在上述过程中,我们看到至少两个性能瓶颈:

(1)如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
  总结:网络带宽严重被占降低程序效率

(2)假设使用美国专利数据集中的国家一项来阐述数据倾斜这个定义,这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等等,大多数的键值对最终会聚集于一个单一的Reducer之上,计算任务分配不够均衡,从而大大降低程序的性能。
  总结:单一节点承载过重降低程序性能

在MapReduce编程模型中,在Mapper和Reducer之间有一个非常重要的组件,它解决了上述的性能瓶颈问题,它就是Combiner

① 与mapper和reducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用。
② 并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。
combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求平均值的话,则不适用。

因为每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能。是MapReduce的一种优化手段。

Combiner总结:
在实际的Hadoop集群操作中,我们是由多台主机一起进行MapReduce的,
如果加入规约(Combiner)操作,每一台主机会在reduce之前进行一次对本机数据的规约,
然后在通过集群进行reduce操作,这样就会大大节省reduce的时间,
从而加快MapReduce的处理速度

Partitioner理解

Map阶段 <small>step1.3就是一个分区操作。通过前面的学习我们知道Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。在一些集群应用中,例如分布式缓存集群中,缓存的数据大多都是靠哈希函数来进行数据的均匀分布的,在Hadoop中也不例外。</small>

MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。
用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区。
默认的分区函数HashPartitioner

Partitioner小结:分区Partitioner主要作用在于以下两点

Shuffle理解

Reduce阶段

Shuffle是什么
针对多个map任务的输出按照不同的分区(Partition)通过网络复制到不同的reduce任务节点上,这个过程就称作为Shuffle。

Hadoop的shuffle过程就是从map端输出到reduce端输入之间的过程,这一段应该是Hadoop中最核心的部分,因为涉及到Hadoop中最珍贵的网络资源,所以shuffle过程中会有很多可以调节的参数,也有很多策略可以研究

map过程的输出是写入本地磁盘而不是HDFS,但是一开始数据并不是直接写入磁盘而是缓冲在内存中,缓存的好处就是减少磁盘I/O的开销,提高合并和排序的速度。又因为默认的内存缓冲大小是100M(当然这个是可以配置的),所以在编写map函数的时候要尽量减少内存的使用,为shuffle过程预留更多的内存,因为该过程是最耗时的过程

再来详细梳理下整个流程

Map端
  1. 在map端首先是InputSplit,在InputSplit中含有DataNode中的数据,每一个InputSplit都会分配一个Mapper任务,Mapper任务结束后产生<K2,V2>的输出,这些输出先存放在缓存中,每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spil l.percent),一个后台线程就把内容写到(spill)Linux本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
  2. 写磁盘前,要进行partition、sort和combine等操作。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有Combiner,还要对排序后的数据进行combine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。
  3. 最后将磁盘中的数据送到Reduce中,图中Map输出有三个分区,有一个分区数据被送到图示的Reduce任务中,剩下的两个分区被送到其他Reducer任务中。而图示的Reducer任务的其他的三个输入则来自其他节点的Map输出。
Reduce端
  1. Copy阶段:Reducer通过Http方式得到输出文件的分区。
      reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据
  2. Merge阶段:如果形成多个磁盘文件会进行合并。
      从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中
    3.Reducer的参数:最后将合并后的结果作为输入传入Reduce任务中

总结:当Reducer的输入文件确定后,整个Shuffle操作才最终结束。之后就是Reducer的执行了,最后Reducer会把结果存到HDFS上。

Hadoop的压缩

Shuffle过程中看到,map端在写磁盘的时候采用压缩的方式将map的输出结果进行压缩是一个减少网络开销很有效的方法

在Java中设置输出压缩 reduce端输出压缩使用了Codec中的Gzip算法,也可以使用bzip2算法

MapReduce排序分组

排序: 在Hadoop默认的排序算法中,只会针对key值进行排序

自定义排序:

public interface WritableComparable<T> extends Writable, Comparable<T> {
}

自定义类型MyNewKey实现了WritableComparable的接口,
该接口中有一个compareTo()方法,当对key进行比较时会调用该方法,而我们将其改为了我们自己定义的比较规则,从而实现我们想要的效果

分组:在Hadoop中的默认分组规则中,也是基于Key进行的,会将相同key的value放到一个集合中去

自定义分组:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}

自定义了一个分组比较器MyGroupingComparator,该类实现了RawComparator接口,而RawComparator接口又实现了Comparator接口,这两个接口的定义:

Hadoop数据类型

Hadoop MapReduce操作的是键值对,但这些键值对并不是Integer、String等标准的Java类型。为了让键值对可以在集群上移动,Hadoop提供了一些实现了WritableComparable接口的基本数据类型,以便用这些类型定义的数据可以被序列化进行网络传输、文件存储与大小比较。

描述
BooleanWritable 标准布尔变量的封装
ByteWritable 单字节数的封装
DoubleWritable 双字节数的封装
FloatWritable 浮点数的封装
IntWritable 整数的封装
LongWritable Long的封装
NullWritable 无键值时的占位符
Text 使用UTF-8格式的文本封装
上一篇 下一篇

猜你喜欢

热点阅读