通过MapReduce的本地运行方式来分析MapReduce的执

2019-03-05  本文已影响0人  苏坡闷

一、首先抛出结论(Map阶段)

    1. Job.afterCompletion():
        检查是否是running状态,如果是running避免重复提交!
        如果状态是define,提交!执行commit()
                    
    2. commit() :  
        创建Cluster对象,是Job运行的集群的抽象表达,包含JobRunner,及文件系统!
        根据Cluster获取Jobcommitter,提交Job
                    
    3. 提交前:
        ①确定当前Job的作业目录
        ②切片,将split.info / split.infometa在作业目录生成
        ③根据切片数,设置MapTask启动个数
        ④生成Job.xml文件(包含了所有的配置信息参数)
                    
    4. 提交: 
        在LocalJobRunner上重构Job对象!
        执行start(),启动一个线程!
                    
    5. Job的run()
                
        ①根据切片信息,获取包含切面及属性信息的数组,根据这个数组,确定List<RunableandThrowable>  mapTaskRunables
                            
        ②根据设置的numReduceTasks(默认为1),确定List<RunableandThrowable>  reduceeTaskRunables
                    
        ③创建线程池,开启多个线程,运行MapTask和ReduceTask
                    
    6. 每个MapTaskRunable对象,都会创建一个MapTask
        MapTask执行runNewMapper()执行Map阶段的核心逻辑!
                    
    7. 每个ReduceTaskRunable对象,都会创建一个ReduceTask
        ReduceTask执行runNewReducer()执行Rudece阶段的核心逻辑!

二、各阶段源码

  1. Job.waitForCompletion
public boolean waitForCompletion(boolean verbose) throws IOException, 
                        InterruptedException, ClassNotFoundException {
   // 判断job是定义状态,避免job提交后正在运行造成的重复提交
   if (state == JobState.DEFINE) {
       //此方法的核心在于submit()
       submit();
   }
   if (verbose) {        // 根据传入参数决定是否将job运行的过程打印显示
       monitorAndPrintJob();
   } else {
      // get the completion poll interval from the client.
       int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
       while (!isComplete()) {
           try {
               Thread.sleep(completionPollIntervalMillis);
           } catch (InterruptedException ie) {
           }
       }
    }
    return isSuccessful();
}

  1. submit()
    2.1 submit方法
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    //确认运行状态
    ensureState(JobState.DEFINE);
    //确定是否是使用的新API
    setUseNewAPI();
    // 创建Cluster对象,Cluster是集群的抽象表达,包含JobRunner,和文件系统
    connect();

    // 根据Cluster创建Job提交器
    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),cluster.getClient());
    // 使用Job提交器,提交Job,获取Job运行状态
    //ugi是UserGroupInformation类的实例,表示Hadoop中的用户和组信息,这个类包装了一个JAAS Subject以及提供了
    //方法来确定用户的名字和组,它同时支持Windows、Unix和Kerberos登录模块
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
            return submitter.submitJobInternal(Job.this, cluster);
        }
    });

    // 将状态改为正在运行
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
}

2.2. setUseNewApi()

private void setUseNewAPI() throws IOException {
    // reduceTask的个数,通过mapreduce.job.reduces设置,默认是1
    int numReduces = conf.getNumReduceTasks();
    ......//后面是一系列的检查语句,此处不作具体解释
}
  1. Jobsubmitter提交Job
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    // 获取输出格式,调用方法,检查输入目录是否设置且不存在!
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    //将conf加入分布式缓存中
    addMRFrameworkToDistributedCache(conf);
    // 生成Job运行期间临时作业目录
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
        //得到本机的提交地址
        submitHostAddress = ip.getHostAddress();
        //得到本机的主机名字
        submitHostName = ip.getHostName();
        //获取之后,在配置文件中进行submitHostName和submitHostAddress的设置
        conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
        conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    // 生成Job的id
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    // 根据Jobid,在Job作业目录,创建一个子目录
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try{
        ......//此段内容为权限检查

        // 生成当前Job的作业目录
        copyAndConfigureFiles(job, submitJobDir);
        // 获取当前Job总的配置文件Job.xml(包含8个配置文件中所有的信息)的路径
        Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          
        // Create the splits for the job
        LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

        // 生成当前Job的切片信息: Job.split(切片信息),job.splitinfo(切片的属性)
        // Job.split保存了有多少个切片对象,以及每个切片是从哪个文件切的哪部分
        // job.splitinfo记录的是每一个切片,应该去哪个主机来读取,块信息的DN主机
        int maps = writeSplits(job, submitJobDir);

        // 根据切片数,设置应该启动几个MapTask
        conf.setInt(MRJobConfig.NUM_MAPS, maps);
        LOG.info("number of splits:" + maps);

        ......//此段内容为权限检查
        // Write job file to submit dir
        // 将当前Job的配置信息生成到作业目录的job.xml中
        writeConf(conf, submitJobFile);
        
        // Now, actually submit the job (using the submit name)
        printTokens(jobId, job.getCredentials());

        // 正式提交
        status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

        if (status != null) {
            return status;
        } else {
            throw new IOException("Could not launch job");
        }
    }finally {
        if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (jtFs != null && submitJobDir != null)
                jtFs.delete(submitJobDir, true);
        }
    }
}

4.1 提交Job
submitJob()方法是接口 ClientProtocol(RPC 协议)中的一个抽象方法。根据 RPC 原理,在【客户端代理对象submitClient】调用RPC协议中的submitJob()方法,此方法一定在服务端执行。该方法也有两种实现: LocalJobRunner(本地模式)和 YARNRunner(YARN模式)

//本地模式的Job提交方式,
public org.apache.hadoop.mapreduce.JobStatus submitJob(
      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
      Credentials credentials) throws IOException {
    Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);   //跳至第4.2阶段
    job.job.setCredentials(credentials);
    return job.status;
}

4.2 New Job

//重构Job对象!
public Job(JobID jobid, String jobSubmitDir) throws IOException {
     …… //各种设置,重构在LocalJobRunner上运行的Job
     // 开启一个线程
     this.start();
}
  1. Job.run()
//运行Job
public void run() {
    JobID jobId = profile.getJobID();
    JobContext jContext = new JobContextImpl(job, jobId);
    
    org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
    try {
        outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
    } catch (Exception e) {
        LOG.info("Failed to createOutputCommitter", e);
        return;
    }
    
    try {
        // 根据切片信息,创建所有的切片及切片属性对象
        TaskSplitMetaInfo[] taskSplitMetaInfos = 
        SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

        int numReduceTasks = job.getNumReduceTasks();
        outputCommitter.setupJob(jContext);
        status.setSetupProgress(1.0f);
        // 使用一个Map记录每个MapTask最终保存文件的信息
        Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
        //创建MapTask运行的线程集合,有几片就启动几个线程
        List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);
              
        initCounters(mapRunnables.size(), numReduceTasks);

        // 创建线程池
        ExecutorService mapService = createMapExecutor();
        // 开启Map阶段线程的运行
        //注意:mapreduce的运行过程中,使用了线程池的技术(放到队列当中,在将来的某个时刻进行执行)
        runTasks(mapRunnables, mapService, "map");

        try {
            if (numReduceTasks > 0) {
              //计算reduce对应的runnable个数
              List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
                  jobId, mapOutputFiles);
               //开启线程池
              ExecutorService reduceService = createReduceExecutor();
              //开启Reduce阶段线程的运行,此篇文章不讲解此阶段,后面新开文章讲解此阶段内容
              runTasks(reduceRunnables, reduceService, "reduce");
            }
        } finally {
            for (MapOutputFile output : mapOutputFiles.values()) {
              output.removeAll();
            }
        }
        // delete the temporary directory in output directory
        outputCommitter.commitJob(jContext);
        status.setCleanupProgress(1.0f);

        if (killed) {
            this.status.setRunState(JobStatus.KILLED);
        } else {
            this.status.setRunState(JobStatus.SUCCEEDED);
        }
        JobEndNotifier.localRunnerNotification(job, status);
    } catch (Throwable t) {
        try {
            outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
        } catch (IOException ioe) {
            LOG.info("Error cleaning up job:" + id);
        }
        status.setCleanupProgress(1.0f);
        if (killed) {
            this.status.setRunState(JobStatus.KILLED);
        } else {
            this.status.setRunState(JobStatus.FAILED);
        }
        LOG.warn(id, t);

        JobEndNotifier.localRunnerNotification(job, status);

    } finally {
        try {
            fs.delete(systemJobFile.getParent(), true);  // delete submit dir
            localFs.delete(localJobFile, true);              // delete local copy
            // Cleanup distributed cache
            localDistributedCacheManager.close();
        } catch (IOException e) {
            LOG.warn("Error cleaning up "+id+": "+e);
        }
    }
}

6.Job类中的runTasks()方法

private void runTasks(List<RunnableWithThrowable> runnables,
    ExecutorService service, String taskType) throws Exception {
    // Start populating the executor with work units.
    // They may begin running immediately (in other threads).
    for (Runnable r : runnables) {
        //进行提交  是一个线程池,执行map或者reduce
        service.submit(r);
    }
    ...
}
  1. MapTaskRunable的run()
public void run() {
    try {
        // 生成当前线程的id
        TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, taskId), 0);
        LOG.info("Starting task: " + mapId);
        mapIds.add(mapId);

        // 创建MapTask对象,这个对象负责当前线程(节点)map阶段逻辑的执行!
        MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,info.getSplitIndex(), 1);
        map.setUser(UserGroupInformation.getCurrentUser().getShortUserName());

        //设置目录
        //map为MapTask类型的一个值,例如本次调试中所获取的值为:attempt_localXXX_0001_m_000000_0
        //localConf()加载配置文件的信息
        setupChildMapredLocalDirs(map, localConf);
        
        //创建一个map输出文件并设置配置信息
        // 当前MapTask生成的数据保存的对象
        MapOutputFile mapOutput = new MROutputFiles();
        mapOutput.setConf(localConf);
        mapOutputFiles.put(mapId, mapOutput);

        //指的是一个job_localXXX_0001.xml文件
        map.setJobFile(localJobFile.toString());
        localConf.setUser(map.getUser());
        map.localizeConfiguration(localConf);
        map.setConf(localConf);
        try {
            map_tasks.getAndIncrement();
            //launchMap()方法,进行启动map
            myMetrics.launchMap(mapId);
            map.run(localConf, Job.this);     //此处将调用Maptask的run方法
            myMetrics.completeMap(mapId);
        } finally {
            map_tasks.getAndDecrement();
        }

        LOG.info("Finishing task: " + mapId);
    } catch (Throwable e) {
      this.storedException = e;
    }
}
  1. MapTask.run()
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, ClassNotFoundException, InterruptedException {
  this.umbilical = umbilical;

    if (isMapTask()) {
        // If there are no reducers then there won't be any sort. Hence the map 
        // phase will govern the entire attempt's progress.
        if (conf.getNumReduceTasks() == 0) {
            mapPhase = getProgress().addPhase("map", 1.0f);
        } else {
            // 如果需要对key-value进行排序,那么必须有reduce阶段!
            // If there are reducers then the entire attempt's progress will be 
            // split between the map phase (67%) and the sort phase (33%).
            mapPhase = getProgress().addPhase("map", 0.667f);
            sortPhase  = getProgress().addPhase("sort", 0.333f);
        }
    }
    TaskReporter reporter = startReporter(umbilical);

    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
        runJobCleanupTask(umbilical, reporter);
        return;
    }
    if (jobSetup) {
        runJobSetupTask(umbilical, reporter);
        return;
    }
    if (taskCleanup) {
        runTaskCleanupTask(umbilical, reporter);
        return;
    }

    if (useNewApi) {
        //此处进入Map的核心处理阶段的入口
        runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
        runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
}

9*. MapTask核心逻辑 RunNewMapper()

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
    // make a task context so we can get the classes
    // 创建配置的上下文
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper  实例化Mapper对象
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format  实例化InputFormat
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split  //重建切片 切片包含了当前片所属的文件及哪个部分
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
        // 负责直接将MapTask输出的结果输出到最终的输出目录
        output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        // 负责将MapTask产生的结果进行收集,交给ReduceTask
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
        input.initialize(split, mapperContext);
        //进入之后运行用户自定义的mapper示例
        mapper.run(mapperContext);
        mapPhase.complete();
        setPhase(TaskStatus.Phase.SORT);
        statusUpdate(umbilical);
        input.close();
        input = null;
        output.close(mapperContext);
        output = null;
    } finally {
        closeQuietly(input);
        closeQuietly(output, mapperContext);
    }
}

10.Mapper.run()

//运行自定义的Mapper!
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        while (context.nextKeyValue()) {
            //在此处开始运行自己写的mapper程序
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        cleanup(context);
    }
}

参考文献:
MapReduce Job本地提交过程源码跟踪及分析https://blog.csdn.net/lemonZhaoTao/article/details/72943618

上一篇下一篇

猜你喜欢

热点阅读