MapReduce架构师2—MapReduce详解
1 MapReduce 流程回顾 00 ~ 0:23:00
MapReduce 流程2 MapReduce Shuffle 过程详解 0:23:00 ~ 1:30 :22
MapReduce Shuffle详解文章:https://blog.csdn.net/zhongqi2513/article/details/78321664
shuffle流程图示
image.pngclass MyMapper() extends Mapper{
void map(key, value, context){
(key, value) ==> (outkey, outvalue)
context.write(outkey, outvalue)
}
}
// mapper输出之后,到 reducer的reduce方法执行之前,我们不知道到底是怎么回事
// 称之为shuffle
// 第一件事:先调用Paritioner打上分区标记 分区个数 和 reduceTask 的个数是一样的
// 第二件事:把key-value输出到一个首尾相连的字节数组中(环形缓冲区 内存) 100M
class MyReducer extends Reducer{
void reduce(key, values, context){
(key, values) ==> (outkey, outvalues)
context.write(outkey, outvalue)
}
}
发现一种现象:
mapper:0% reducer:0%
mapper:100% reducer:0%
mapper:100% reducer:100%
mapper:0% reducer:0%
mapper:80% reducer:10%
mapper:90% reducer:15%
mapper:100% reducer:20%
mapper:100% reducer:100%
mapper阶段溢出的磁盘文件的组织形式:
1,a,1
1,a,2
1,a,1
1,b,1
1,b,2
...
1,z,22
2,c,1
2,f,1
2,f,22
...
5,q,1
5,y,1
reducer的输入大文件的格式:
a,1
a,2
a,1
a,2
a,1
a,2
a,1
a,1
.....
b,1
b,1
b,2
为什么mapreduce有shuffle阶段就一定会把数据按照key来进行排序呢?
1、比如reduce输入的file不排序,每一组数据,都需要去扫描全文件
2、如果reudce输入的file有序,那么只需要顺序扫描该文件,按照相邻的两条key-value的key做比较,就能判断,下一个key-value是不是属于上一个key的同一组。如果一样,就是同一组,继续扫描,如果不一样,当前组到此为止
整个文件,只需要顺序扫描一次即可!
2.1 MapReduce的切片类
public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
3 MapReduce源码剖析 1:55 :22 ~ 3:26:00
入口:运行wordcount例子。 job的提交
3. 1. MapReduce任务执行流程详细分析 1:55 :22 ~ 2:25 :00
核心代码:
org/apache/hadoop/mapreduce/Job.java
job.waitForCompletion
——》job.submit();
——》方法明细:
ensureState(JobState.DEFINE);
/**
* TODO 启用新API
* hadoop-2.x 的API 都是 newapi, org.apache.hadoop.mapreduce.xxxx 以前是接口的一些类,现在都是普通类了
* hadoop-1.x 的API 都是 oldapi, org.apache.hadoop.mapred.xxx 很多父类都是接口
*/
setUseNewAPI();
/**
* TODO 链接集群! 链接YARN集群! 这里最重要的是生成一个提交任务到YARN集群的客户端
* 当前这个 connect 方法最重要的工作就是构建一个 cluster 成员变量
* 然后这个 cluster 在初始化的时候,又会构建一个成员变量 client = YARNRunner *
* 提交任务,就是通过 YARNRuner 去提交的。
* 如果是本地运行,则是 LocalRunner
*/
connect();
// 提交代码
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
/**
* TODO 真正提交任务
*/
return submitter.submitJobInternal(Job.this, cluster);
}
});
submitter#submitJobInternal 方法详解
/**
*1: 检查工作目录和输出目录
* 如果你指定的输出目录不是不存在的,则会报错!
*/
checkSpecs(job);
/**
* 2: TODO 生成一个jobID
*/
JobID jobId = submitClient.getNewJobID();
// 3 Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
//4 写入job 的 xml文件到 提交的 临时目录
writeConf(conf, submitJobFile);
// 5 提交任务
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
由 submitClient 实现子类 YARNRunner#submitJob()方法
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
——》ResourceMgrDelegate#submitApplication
↓ 实现类
YarnClientImpl#submitApplication
——》 rmClient.submitApplication(request);
↓ 实现类
ApplicationClientProtocolPBClientImpl#submitApplication
/**
* TODO 提交任务到这儿基本OK
* 所谓提交任务,其实就是一个 RM 的一个客户端代理对象,给 RM 发送了一个事件,告诉 RM 我提交了一个应用程序
* 这个事件当中,会包含很多信息:jobid, submitDir
*
* 整个任务的提交告一段落
*/
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));
下面 进入 yarn 的resourceManager的 提交过程,流程图如下 2:25:00~ 2:30:00
job的提交到yarn集群.png关键步骤:yarn的 RM 选择一个 NM 节点 启动 MRAppmaster
MRAppmaster #main()
/**
* TODO 初始化和启动!
*/
initAndStartAppMaster(appMaster, conf, jobUserName);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
/**
* TODO
*/
appMaster.init(conf);
appMaster.start();
if (appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null;
}
});
MRAppmaster 发命令让RM 启动 一个NM上的 YarnChild
YarnChild main()方法
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// use job-specified working directory
setEncryptedSpillKeyIfRequired(taskFinal);
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
/**
* TODO 正经的调用给一个 Task 执行(MapTask, ReduceTask)
*/
taskFinal.run(job, umbilical); // run the task
return null;
}
});
taskFinal.run 方法
进入具体的 MapTask 和 ReduceTask 类方法
3. 3. MapReduce的并行度决定机制 (无内容)
3. 4. MapReduce的MapTask执行逻辑源码解读 2:30:00 ~ 3:16:00
接上面 YarnChild#main 方法
—》taskFinal.run(job, umbilical);
—》MapTask#run方法
runNewMapper 重要逻辑
—》runNewMapper
// TODO make a task context so we can get the classes
// TODO 每个 MapTask 都会有一个 TaskContext 的上下文对象
// job.setXXX() 设置的各种组件,都放置在 taskContext 里面了。
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),
reporter);
// TODO 就是自己定义的mapper组件的实例对象
org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY,
OUTVALUE>) ReflectionUtils
.newInstance(taskContext.getMapperClass(), job);
// TODO make the input format, 默认是: TextInputFormat.class
org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE>) ReflectionUtils
.newInstance(taskContext.getInputFormatClass(), job);
// TODO 每个 逻辑切片启动一个MapTask
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
// TODO RecordReader 是 由 InputFormat 来创建和关闭的!
org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new NewTrackingRecordReader<INKEY, INVALUE>(split, inputFormat, reporter,
taskContext);
// TODO RecordWriter 负责数据写出
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
}
else{
//TODO 初始化环形缓冲区
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
▼
collector = createSortingCollector(job, reporter);
——》createSortingCollector
↓
collector.init(context); 环形缓冲区的初始化
↓
MapOutputBuffer#init()
▼
/**
* TODO 当前 MpaOutputBuffer 这个环形缓冲区的管理类,事实上管理了两个重要的东西:
* 1、100M 大小的字节数组
* 2、SpillThread 负责 kvBuffer 中装满了的 80% 空间的数据的溢写
*/
spillLock.lock();
try {
spillThread.start();//重要方法
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
/**
* TODO 重要 传播一些信息到各种组件
*/
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);
/**
* TODO mapperContext 包装了 mapContext
* mapper.run(context) 中 context 具备四个方法,其实就是调用了 mapperContext 的四个方法。
* 其实就是调用了 mapContext 的四个同名方法
*
* mapper.run(context)
*/
org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY,
OUTVALUE>()
.getMapContext(mapContext);
try {
/**
* TODO 访问数据源,如果是文件,提前创建建好读取该文件的输入流
* 如果是数据库,提前创建好数据库链接!
*
* input = NewTrackingRecordReader
*
* 最重要的事情,就是根据 InputFormat 创建了一个 RecordReader
* real = LineRecordReader
*/
input.initialize(split, mapperContext);
/**
* TODO 这个mapper实例, 就是你在写程序的时候,定义的那个 Mapper 类的实例对象!
* 只有到了这儿,才会调用你写的 mapper中的 map 方法中的逻辑
*/
mapper.run(mapperContext);
—》map(context.getCurrentKey(), context.getCurrentValue(), context);
—》 context.write((KEYOUT) key, (VALUEOUT) value);
↓
WrappedMapper类#write
↓
TaskInputOutputContextImpl#write
▼
// Output = OutputCollector
output.write(key, value);
↓
NewOutputCollector#write
▼//获取要写入的分区编号
int partition_no = partitioner.getPartition(key, value, partitions);
/**
* collector.collect 写进入到了 MapOoutputBuffer,真正完成数据从mapper写出到 环形缓冲区
*/
collector.collect(key, value, partition_no);
↓
MapOoutputBuffer#collect()
startSpill()