MapReducer任务在到Yarn上运行流程分析

2018-02-02  本文已影响112人  搬砖程序猿
  1. 以WordCount为例
public class WordCount
{
  public static void main(String[] args)
    throws Exception
  {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; i++) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
    //开始提交
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
  {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException
    {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      this.result.set(sum);
      context.write(key, this.result);
    }
  }

  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
  {
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
    {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        this.word.set(itr.nextToken());
        context.write(this.word, one);
      }
    }
  }
}

2.Job类分析

 public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();    //开始提交
    }
    if (verbose) {
      monitorAndPrintJob();   //提交成功的话,监控打印任务进度
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

4.Job submit方法

public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();        //根据配置信息向resourcemanager建立联系
    //得到提交器
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
      //提交器提交任务
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
  1. submitter.submitJobInternal(Job.this, cluster);
 //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    //会使用Rpc与resoucemanager建立联系,得到JobId
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    //任务在yarn上的目录,存放jar包资源文件等
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }
      //把任务的jar包复制放到HDFS上的任务目录下
      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      //写分片信息
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
     //设置任务的队列,每个用户会有一个队列
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      //所有的配置文件写到任务目录下
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      //真正开始提交任务
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }

5 写分片信息 writeSplits(job, submitJobDir);

     private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
 private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   //对文件进行切分,得到分片信息
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    //对分片信息进行排序,大的分片先运行
    Arrays.sort(array, new SplitComparator());
    //将分片元数据信息写到文件中,文件在HDFS的任务目录下
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }
  

6.org.apache.hadoop.mapred.YARNRunner由这个类来进行提交

 @Override
  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    addHistoryToken(ts);
    
    // Construct necessary information to start the MR AM
    设置ApplicatonMaster的初始化信息,包括jar包,资源,启动命令等,每个应用都有一个ApplicationMaster
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
     //再交由底层的,使用rpc协议提交到resourcemanager
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);
      //整个后面,判断appMaster的提交状态
      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

7.ApplicationMaster的初始化信息

public ApplicationSubmissionContext createApplicationSubmissionContext(
  Configuration jobConf,
  String jobSubmitDir, Credentials ts) throws IOException {
ApplicationId applicationId = resMgrDelegate.getApplicationId();
// Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory(
    conf.getInt(
        MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
        )
    );
capability.setVirtualCores(
    conf.getInt(
        MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
        )
    );
LOG.debug("AppMaster capability = " + capability);

// Setup LocalResources
Map<String, LocalResource> localResources =
    new HashMap<String, LocalResource>();

Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);

URL yarnUrlForJobSubmitDir = ConverterUtils
    .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
        .resolvePath(
            defaultFileContext.makeQualified(new Path(jobSubmitDir))));
LOG.debug("Creating setup context, jobSubmitDir url is "
    + yarnUrlForJobSubmitDir);

localResources.put(MRJobConfig.JOB_CONF_FILE,
    createApplicationResource(defaultFileContext,
        jobConfPath, LocalResourceType.FILE));
if (jobConf.get(MRJobConfig.JAR) != null) {
  Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
  LocalResource rc = createApplicationResource(
      FileContext.getFileContext(jobJarPath.toUri(), jobConf),
      jobJarPath,
      LocalResourceType.PATTERN);
  String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
      JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
  rc.setPattern(pattern);
  localResources.put(MRJobConfig.JOB_JAR, rc);
} else {
  // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
  // mapreduce jar itself which is already on the classpath.
  LOG.info("Job jar is not present. "
      + "Not adding any jar to the list of resources.");
}

// TODO gross hack
for (String s : new String[] {
    MRJobConfig.JOB_SPLIT,
    MRJobConfig.JOB_SPLIT_METAINFO }) {
  localResources.put(
      MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
      createApplicationResource(defaultFileContext,
          new Path(jobSubmitDir, s), LocalResourceType.FILE));
}

// Setup security tokens
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

// Setup the command to run the AM
List<String> vargs = new ArrayList<String>(8);
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
    + "/bin/java");

Path amTmpDir =
    new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + amTmpDir);
MRApps.addLog4jSystemProperties(null, vargs, conf);

// Check for Java Lib Path usage in MAP and REDUCE configs
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", 
    MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", 
    MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", 
    MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", 
    MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);

// Add AM admin command opts before user command opts
// so that it can be overridden by user
String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
    MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
warnForJavaLibPath(mrAppMasterAdminOptions, "app master", 
    MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
vargs.add(mrAppMasterAdminOptions);

// Add AM user command opts
String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
    MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
warnForJavaLibPath(mrAppMasterUserOptions, "app master", 
    MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
vargs.add(mrAppMasterUserOptions);

if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
    MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
  final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
      MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
  if (profileParams != null) {
    vargs.add(String.format(profileParams,
        ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
            + TaskLog.LogName.PROFILE));
  }
}

vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
    Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
    Path.SEPARATOR + ApplicationConstants.STDERR);


Vector<String> vargsFinal = new Vector<String>(8);
// Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
  mergedCommand.append(str).append(" ");
}
vargsFinal.add(mergedCommand.toString());

LOG.debug("Command to launch container for ApplicationMaster is : "
    + mergedCommand);

// Setup the CLASSPATH in environment
// i.e. add { Hadoop jars, job jar, CWD } to classpath.
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf);

// Shell
environment.put(Environment.SHELL.name(),
    conf.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL,
        MRJobConfig.DEFAULT_SHELL));

// Add the container working directory in front of LD_LIBRARY_PATH
MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
    MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);

// Setup the environment variables for Admin first
MRApps.setEnvFromInputString(environment, 
    conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV,
        MRJobConfig.DEFAULT_MR_AM_ADMIN_USER_ENV), conf);
// Setup the environment variables (LD_LIBRARY_PATH, etc)
MRApps.setEnvFromInputString(environment, 
    conf.get(MRJobConfig.MR_AM_ENV), conf);

// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources);

Map<ApplicationAccessType, String> acls
    = new HashMap<ApplicationAccessType, String>(2);
acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
    MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
    MRJobConfig.JOB_ACL_MODIFY_JOB,
    MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));

// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
    ContainerLaunchContext.newInstance(localResources, environment,
      vargsFinal, null, securityTokens, acls);

Collection<String> tagsFromConf =
    jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);

// Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext =
    recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
appContext.setApplicationId(applicationId);                // ApplicationId
appContext.setQueue(                                       // Queue name
    jobConf.get(JobContext.QUEUE_NAME,
    YarnConfiguration.DEFAULT_QUEUE_NAME));
// add reservationID if present
ReservationId reservationID = null;
try {
  reservationID =
      ReservationId.parseReservationId(jobConf
          .get(JobContext.RESERVATION_ID));
} catch (NumberFormatException e) {
  // throw exception as reservationid as is invalid
  String errMsg =
      "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
          + " specified for the app: " + applicationId;
  LOG.warn(errMsg);
  throw new IOException(errMsg);
}
if (reservationID != null) {
  appContext.setReservationID(reservationID);
  LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
      + " to queue:" + appContext.getQueue() + " with reservationId:"
      + appContext.getReservationID());
}
appContext.setApplicationName(                             // Job name
    jobConf.get(JobContext.JOB_NAME,
    YarnConfiguration.DEFAULT_APPLICATION_NAME));
appContext.setCancelTokensWhenComplete(
    conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
appContext.setAMContainerSpec(amContainer);         // AM Container
appContext.setMaxAppAttempts(
    conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
        MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
appContext.setResource(capability);

// set labels for the AM container request if present
String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
if (null != amNodelabelExpression
    && amNodelabelExpression.trim().length() != 0) {
  ResourceRequest amResourceRequest =
      recordFactory.newRecordInstance(ResourceRequest.class);
  amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
  amResourceRequest.setResourceName(ResourceRequest.ANY);
  amResourceRequest.setCapability(capability);
  amResourceRequest.setNumContainers(1);
  amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
  appContext.setAMContainerResourceRequest(amResourceRequest);
}
// set labels for the Job containers
appContext.setNodeLabelExpression(jobConf
    .get(JobContext.JOB_NODE_LABEL_EXP));

appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
  appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
}

return appContext;

8.ResourceManager收到请求后,通知nodemanager,运行AppMaster,调度器会给nodemanager分配Container,让nodemanager执行AppMaster,
AppMaster启动之后,先向resourcemanager注册自己,resourcemanager,才能知道任务的运行信息,然后向resourcemanager请求分配Container,Container由调度器分配,会得到Container的内存,cpu以及在哪个节点上。

resourcemanager.scheduler.SchedulerNode: Assigned container container_1515076284174_0005_01_000004 of capacity <memory:1024, vCores:1> on host centos-3:8041, which has 1 containers, <memory:1024, vCores:1> used and <memory:36, vCores:0> available after allocation

AppMaster得到Container信息后指定nodemanager启动Container去运行任务,AppMaster会监控Container任务的运行情况,同时向ResourceManger报告任务信息.
每个nodemanager会向resoucemanager发送心跳信息,resourcemanger会根据所有节点的心跳知道整个集群的资源,这样调度器才能合理的分配Container。
每个AppMaster,任务完成后会向resourcemanage取消注册。这样一个任务就完成了。
9.分析resourcemanager任务日志,分配Container信息全部在resourcemanager节点的日志信息中,这里只截了第一个Container信息。


resourcemanager.ClientRMService: Allocated new applicationId: 5
resourcemanager.ClientRMService: Application with id 5 submitted by user root
resourcemanager.rmapp.RMAppImpl: Storing application with id application_1515076284174_0005
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from NEW to NEW_SAVING
resourcemanager.RMAuditLogger: USER=root    IP=172.31.109.168   OPERATION=Submit Application Request    TARGET=ClientRMService  RESULT=SUCCESS  APPID=application_1515076284174_0005
resourcemanager.recovery.RMStateStore: Storing info for app: application_1515076284174_0005
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from NEW_SAVING to SUBMITTED
resourcemanager.scheduler.fair.FairScheduler: Accepted application application_1515076284174_0005 from user: root, in queue: default, currently num of applications: 1
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from SUBMITTED to ACCEPTED
resourcemanager.ApplicationMasterService: Registering app attempt : appattempt_1515076284174_0005_000001
resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1515076284174_0005_000001 State change from NEW to SUBMITTED
resourcemanager.scheduler.fair.FairScheduler: Added Application Attempt appattempt_1515076284174_0005_000001 to scheduler from user: root
resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1515076284174_0005_000001 State change from SUBMITTED to SCHEDULED
resourcemanager.rmcontainer.RMContainerImpl: container_1515076284174_0005_01_000001 Container Transitioned from NEW to ALLOCATED
resourcemanager.RMAuditLogger: USER=root    OPERATION=AM Allocated Container    TARGET=SchedulerApp RESULT=SUCCESS  APPID=application_1515076284174_0005    CONTAINERID=container_1515076284174_0005_01_000001
//可以看到container编号的尾号是00001证明这是第一个Container容器,在机器Centos-2上运行。
**resourcemanager.scheduler.SchedulerNode: Assigned container container_1515076284174_0005_01_000001 of capacity <memory:1024, vCores:1> on host centos-2:8041, which has 1 containers, <memory:1024, vCores:1> used and <memory:36, vCores:0> available after allocation**

10.我运行程序的时候有6个map,默认一个reduce,再加上第一个AppMaster运行的Container一共分配了8个Container,第一个Container在Centos-2上分配,其它的7个Container都分配在了Centos-3上了,可以从resourcemanage日志中看出来。可以在Centos-3上验证你的想法


image
上一篇下一篇

猜你喜欢

热点阅读