hadoop

MapReduce架构师2—MapReduce详解

2020-08-24  本文已影响0人  fat32jin

1 MapReduce 流程回顾 00 ~ 0:23:00

MapReduce 流程

2 MapReduce Shuffle 过程详解 0:23:00 ~ 1:30 :22

MapReduce Shuffle详解文章:https://blog.csdn.net/zhongqi2513/article/details/78321664

shuffle流程图示

image.png
class MyMapper() extends Mapper{
    void map(key, value, context){
        (key, value) ==> (outkey, outvalue)
        context.write(outkey, outvalue)
    }
}

// mapper输出之后,到 reducer的reduce方法执行之前,我们不知道到底是怎么回事
// 称之为shuffle
// 第一件事:先调用Paritioner打上分区标记     分区个数 和 reduceTask 的个数是一样的
// 第二件事:把key-value输出到一个首尾相连的字节数组中(环形缓冲区 内存)   100M  

class MyReducer extends Reducer{
    void reduce(key, values, context){
        (key, values) ==> (outkey, outvalues)
        context.write(outkey, outvalue)
    }
}

发现一种现象:

mapper:0%  reducer:0%
mapper:100%  reducer:0%
mapper:100%  reducer:100%
mapper:0%  reducer:0%
mapper:80%  reducer:10%
mapper:90%  reducer:15%
mapper:100%  reducer:20%
mapper:100%  reducer:100%
mapper阶段溢出的磁盘文件的组织形式:
1,a,1
1,a,2
1,a,1
1,b,1
1,b,2
...
1,z,22
2,c,1
2,f,1
2,f,22
...
5,q,1
5,y,1
reducer的输入大文件的格式:
a,1
a,2
a,1
a,2
a,1
a,2
a,1
a,1
.....
b,1
b,1
b,2
为什么mapreduce有shuffle阶段就一定会把数据按照key来进行排序呢?
1、比如reduce输入的file不排序,每一组数据,都需要去扫描全文件
2、如果reudce输入的file有序,那么只需要顺序扫描该文件,按照相邻的两条key-value的key做比较,就能判断,下一个key-value是不是属于上一个key的同一组。如果一样,就是同一组,继续扫描,如果不一样,当前组到此为止
整个文件,只需要顺序扫描一次即可!

2.1 MapReduce的切片类

public class FileSplit   extends org.apache.hadoop.mapreduce.InputSplit

3 MapReduce源码剖析 1:55 :22 ~ 3:26:00

入口:运行wordcount例子。 job的提交

3. 1. MapReduce任务执行流程详细分析 1:55 :22 ~ 2:25 :00

核心代码:

org/apache/hadoop/mapreduce/Job.java

job.waitForCompletion
——》job.submit();
——》方法明细:


        ensureState(JobState.DEFINE);
        
        /**
         * TODO 启用新API
         *  hadoop-2.x 的API 都是 newapi, org.apache.hadoop.mapreduce.xxxx   以前是接口的一些类,现在都是普通类了
         *  hadoop-1.x 的API 都是 oldapi, org.apache.hadoop.mapred.xxx  很多父类都是接口       
         */
        setUseNewAPI();
        
        /**
         * TODO 链接集群! 链接YARN集群! 这里最重要的是生成一个提交任务到YARN集群的客户端    
         * 当前这个 connect 方法最重要的工作就是构建一个  cluster 成员变量
         * 然后这个 cluster 在初始化的时候,又会构建一个成员变量 client = YARNRunner         *
         * 提交任务,就是通过 YARNRuner 去提交的。
         * 如果是本地运行,则是 LocalRunner
         */
        connect();
  // 提交代码
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {

          /**
           * TODO  真正提交任务
           */
          return submitter.submitJobInternal(Job.this, cluster);
        }
    });

submitter#submitJobInternal 方法详解

         /**
         *1:    检查工作目录和输出目录
         * 如果你指定的输出目录不是不存在的,则会报错!
         */        
        checkSpecs(job);

       /**
         * 2:   TODO 生成一个jobID
         */
        JobID jobId = submitClient.getNewJobID();
        // 3  Create the splits for the job
            LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
            int maps = writeSplits(job, submitJobDir);


         //4   写入job 的 xml文件到 提交的 临时目录
            writeConf(conf, submitJobFile);

            // 5  提交任务
             status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

由 submitClient 实现子类 YARNRunner#submitJob()方法

ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

——》ResourceMgrDelegate#submitApplication
↓ 实现类
YarnClientImpl#submitApplication
——》 rmClient.submitApplication(request);
↓ 实现类
ApplicationClientProtocolPBClientImpl#submitApplication

            /**
             * TODO 提交任务到这儿基本OK
             *  所谓提交任务,其实就是一个 RM 的一个客户端代理对象,给 RM 发送了一个事件,告诉 RM 我提交了一个应用程序
             *  这个事件当中,会包含很多信息:jobid, submitDir
             *
             *  整个任务的提交告一段落
             */
            return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));
下面 进入 yarn 的resourceManager的 提交过程,流程图如下 2:25:00~ 2:30:00
job的提交到yarn集群.png
关键步骤:yarn的 RM 选择一个 NM 节点 启动 MRAppmaster

MRAppmaster #main()

          /**
         * TODO 初始化和启动!
         */
        initAndStartAppMaster(appMaster, conf, jobUserName);
      appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public Object run() throws Exception {
            
            /**
             * TODO
             */
            appMaster.init(conf);
            appMaster.start();
            if (appMaster.errorHappenedShutDown) {
                throw new IOException("Was asked to shut down.");
            }
            return null;
        }
    });

MRAppmaster 发命令让RM 启动 一个NM上的 YarnChild

YarnChild main()方法

childUGI.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
                   // use job-specified working directory
                   setEncryptedSpillKeyIfRequired(taskFinal);
                   FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
                   
                   /**
                    * TODO 正经的调用给一个 Task 执行(MapTask, ReduceTask)
                    */
                   taskFinal.run(job, umbilical); // run the task
                   return null;
               }
           });

taskFinal.run 方法
进入具体的 MapTask 和 ReduceTask 类方法

3. 3. MapReduce的并行度决定机制 (无内容)

3. 4. MapReduce的MapTask执行逻辑源码解读 2:30:00 ~ 3:16:00

接上面 YarnChild#main 方法

—》taskFinal.run(job, umbilical);
—》MapTask#run方法
runNewMapper 重要逻辑
—》runNewMapper

 // TODO make a task context so we can get the classes
        // TODO 每个 MapTask 都会有一个 TaskContext 的上下文对象
        // job.setXXX() 设置的各种组件,都放置在 taskContext 里面了。
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),
                reporter);

 // TODO 就是自己定义的mapper组件的实例对象
        org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY,
                OUTVALUE>) ReflectionUtils
                .newInstance(taskContext.getMapperClass(), job);

 // TODO make the input format, 默认是: TextInputFormat.class
        org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE> inputFormat =
                (org.apache.hadoop.mapreduce.InputFormat<INKEY, INVALUE>) ReflectionUtils
                .newInstance(taskContext.getInputFormatClass(), job);

 // TODO 每个 逻辑切片启动一个MapTask
        org.apache.hadoop.mapreduce.InputSplit split = null;
        split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
       
// TODO RecordReader 是 由 InputFormat 来创建和关闭的!
        org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new NewTrackingRecordReader<INKEY, INVALUE>(split, inputFormat, reporter,
                taskContext);

 
        // TODO RecordWriter 负责数据写出         
        org.apache.hadoop.mapreduce.RecordWriter output = null;
        // get an output object
        if (job.getNumReduceTasks() == 0) {
            output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
        } 
else{    
              //TODO 初始化环形缓冲区             
            output = new NewOutputCollector(taskContext, job, umbilical, reporter);
                          ▼
              collector = createSortingCollector(job, reporter);
                ——》createSortingCollector
                            ↓ 
                   collector.init(context);  环形缓冲区的初始化
                           ↓ 
                     MapOutputBuffer#init() 
                             ▼
            /**
             * TODO 当前 MpaOutputBuffer 这个环形缓冲区的管理类,事实上管理了两个重要的东西:
             *  1、100M 大小的字节数组
             *  2、SpillThread 负责 kvBuffer 中装满了的 80% 空间的数据的溢写
             */
            spillLock.lock();
            try {
                spillThread.start();//重要方法  
                while (!spillThreadRunning) {
                    spillDone.await();
                }
            } catch (InterruptedException e) {
                throw new IOException("Spill thread failed to initialize", e);
            } finally {
                spillLock.unlock();
            }


/**
         * TODO 重要  传播一些信息到各种组件
         */
        org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext =
                new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);
        
/**
         * TODO mapperContext 包装了 mapContext
         *  mapper.run(context) 中 context 具备四个方法,其实就是调用了  mapperContext 的四个方法。
         *  其实就是调用了 mapContext 的四个同名方法
         *
         * mapper.run(context)
         */
        org.apache.hadoop.mapreduce.Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY,
                OUTVALUE>()
                .getMapContext(mapContext);


try {
            
            /**
             * TODO 访问数据源,如果是文件,提前创建建好读取该文件的输入流
             *   如果是数据库,提前创建好数据库链接!
             *
             *   input = NewTrackingRecordReader
             *
             *   最重要的事情,就是根据 InputFormat 创建了一个 RecordReader
             *   real = LineRecordReader
             */
            input.initialize(split, mapperContext);
            
            /**
             * TODO  这个mapper实例, 就是你在写程序的时候,定义的那个 Mapper 类的实例对象!
             *  只有到了这儿,才会调用你写的 mapper中的 map 方法中的逻辑
             */
            mapper.run(mapperContext);
             —》map(context.getCurrentKey(), context.getCurrentValue(), context);
                 —》 context.write((KEYOUT) key, (VALUEOUT) value);
                               ↓ 
                      WrappedMapper类#write
                              ↓ 
                      TaskInputOutputContextImpl#write
                         ▼
                    // Output = OutputCollector
                        output.write(key, value);
                             ↓ 
                     NewOutputCollector#write
                          ▼//获取要写入的分区编号                    
          int partition_no = partitioner.getPartition(key, value, partitions);  

         /**
           * collector.collect 写进入到了 MapOoutputBuffer,真正完成数据从mapper写出到 环形缓冲区
           */
          collector.collect(key, value, partition_no);
                 ↓ 
         MapOoutputBuffer#collect()
         
 startSpill()
  

3. 5. MapReduce的shuffle机制源码详解 (无)

3. 6. MapReduce的Partitioner组件源码详解 (无)

3. 7. MapReduce的MapOutputBuffer内存环形缓冲区源码详解 (未讲)

3. 8. MapReduce的分组机制源码解读 (未讲)

3. 9. MapReduce的ReduceTask的执行逻辑源码解读 (未讲)

3. 10. MapReduce的输出OutputFormat组件源码解读(未讲)

上一篇下一篇

猜你喜欢

热点阅读