大数据大数据,机器学习,人工智能

MapReduce

2020-04-29  本文已影响0人  ZzzZBbbB

参考链接:https://github.com/wangzhiwubigdata/God-Of-BigData/blob/master/%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%A1%86%E6%9E%B6%E5%AD%A6%E4%B9%A0/Hadoop-MapReduce.md

MapReduce 概述

分布式计算框架
A MapReduce job consists of a number of
– Map tasks(Map task performs data transformation)
– Reduce tasks(Reduce task combines results of map tasks)
– (Internally) shuffle tasks (Shuffle task sends output of map tasks to right reduce tasks)

比较简单的流程(后续有详解):
input data -> blocks in hdfs -> map function in parallel -> reduce -> output

MapReduce 1.x 流程(MapReduce最原始的处理流程,2.x之后使用YARN)

hadoop cluster.png

JobTracker: ===>集群资源管理与作业调度 + 与client进行通信

TaskTracker ===>汇报心跳,一个是执行命令

JobTracker & TaskTracker.png

从 Map function和Recuce function 说起

python 中map和reduce的使用

https://www.runoob.com/python/python-func-map.html

map函数: map(function, iterable, ...)
>>> map(lambda x: x ** 2, [1, 2, 3, 4, 5])  
[1, 4, 9, 16, 25]
# 提供了两个列表,对相同位置的列表数据进行相加
>>> map(lambda x, y: x + y, [1, 3, 5, 7, 9], [2, 4, 6, 8, 10])
[3, 7, 11, 15, 19]

reduce 函数 reduce(function, iterable[, initializer])
from functools import reduce
>>> reduce(lambda x, y: x+y, [1,2,3,4,5])  
15

MapReduce 中map和reduce的使用

MapReduce diagram.png
MapReduce in parallel.png

In hadoop

wordcount 例子:

word.txt
hello
hello world

Map function:

Reduce function:
– Input: <word, list of 1’s> => {"hello":[1,1],"world":[1]} 所以其实reduce的输入在某种程度上来说不是map的输出
– Output: <word, count> where count is the number of 1's in the input list =>
{"hello":2,"world":1}

Map 和 Reduce 中的Java实现

对于MapReduce框架,其更像一套八股文,我们只需要编写Map和Reduce函数即可,其他的细节框架已经帮我们实现了,在Java中通过Mapper和Reducer类封装,所以我们可以编写子类继承父类然后overide相应的方法即可

Each map task runs an instance of Mapper
– Mapper has a map function
– Map task invokes the map function of the Mapper once for each input key‐value pair

Mapper.png

Each reduce task runs an instance of Reducer
– Reducer has a reduce function
– Reduce task invokes the reduce function of the Reducer once for every different intermediate key
– For the reduce function, values are NOT in any particular order

Reducer.png

Map 之后, Reduce之前的细节 --- Shuffling

Shuffle

Map可能在不同的机器上并行处理的,需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,并且这一步也将一个由Map生成的<k,v> 变成<k,[v1,v2]>传给后续的Reduce来操作


Shuffle.png

图片来源:
https://xinze.fun/2020/01/10/Spark-Shuffle-%E5%92%8C-Spill-%E7%9A%84%E5%8C%BA%E5%88%AB/

Internal of shuffling
1.Map side:Partition, sort, spill & merge

2.Reduce side: Fetch & merge

details about Shuffle.png

回到开头 数据的输入输出格式

mapreduce with 2 nodes.png

输入

InputFormat

在Java中InputFormat是个抽象类,其有诸多的子类去读取特定的输入
• FileInputFormat (input from files in given dirs) --- 用的最多
• DBInputFormat (input data from a database)
• CombineFileInputFormat (input data by combining multiple files)

FileInputFormat
– Takes paths to files
– Read all files in the paths
Divide each file into one or more InputSplits
FileInputFormat也是一个抽象类,其下有对应的子类来实现对应的内容输入
– TextInputFormat
– KeyValueTextInputFormat
– SequenceFileInputFormat

Subclasses of FileInputFormat.png

InputFormat 干的第一件事情:Split input file into InputSplits

聊一聊InputSplit,注意一点,InputSplit是一个逻辑划分的概念
• If a file is big, multiple splits may be created Typical split size = 128MB
• A map task is created for each split (a chunk of some input file)

InputFormat 干的第二件事情:Implement RecordReader to read data from InputSplits

输出

OutputFormat

可选操作:Combiner

个人理解:combiner 约等于 一个本地化的reduer

注意,使用combiner是为了节省空间成本、加快计算,但是不能对最终的结果有影响,例如说求个平均数什么的就不能用combiner
专业的说法就是:
May directly use the combiner

上一篇 下一篇

猜你喜欢

热点阅读