《Hadoop-MapReduce源码解析》之二: org.ap
- Hadoop版本:2.10.2
- 书接上文:《MapReduce源码解析》之一:org.apache.hadoop.mapreduce.Job#waitForCompletion
1. org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal
* Internal method for submitting jobs to the system.
* The job submission process involves:
* 1. Checking the input and output specifications of the job.
* 1. 检查作业输入输出规范
* 2. Computing the InputSplits for the job.
* 2. 计算作业的输入分片
* 3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
* 3. 如有必要,请为作业的分布式缓存设置必要的记帐信息
* 4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
* 4. 将作业的jar和配置复制到分布式文件系统上的map reduce系统目录中
* 5. Submitting the job to the JobTracker and optionally monitoring it's status.
* 5. 将作业提交给JobTracker,并可选择监视其状态
* Params:
* job – the configuration to submit cluster – the handle to the Cluster
* Throws:
* ClassNotFoundException –
* InterruptedException –
* IOException
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 验证作业的输出规范
// 通常检查输出路径是否已经存在,当它已经存在时抛出异常,这样输出就不会被覆盖
// 除非明确关闭,否则Hadoop默认指定两个资源,按类路径的顺序加载:
// core-default.xml:hadoop的只读默认值。
// core-site.xml:给定hadoop安装的特定于站点的配置。
Configuration conf = job.getConfiguration();
// 从MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH("mapreduce.application.framework.path")中
// 解析路径中的任何符号链接
// 解析后的uri添加到分布式缓存中:
// MRJobConfig.CACHE_ARCHIVES = "mapreduce.job.cache.archives";
// conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
// : archives + "," + uri.toString());
// 获取放置作业特定文件的暂存目录
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
// 设置提交任务的主机名称和地址
// 这里的submitClient在后续解释,即:LocalJobRunner或者YARNRunner
// 创建Applicant,生成JobId,返回JobId
JobID jobId = submitClient.getNewJobID();
// 提交作业的路径(Path parent, String child),将两个参数拼接为一个新路径
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
// 作业状态
JobStatus status = null;
try {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
// 从与传递的路径(作业文件目录)相对应的名称节点获取委派令牌
new Path[] { submitJobDir }, conf);
// 从所有NAMENODE节点处获取委派令牌
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
// 获取Shuffle密钥来授权Shuffle转换
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
SecretKey shuffleKey = keyGen.generateKey();
// 在Hadoop MapReduce中,当进行数据溢出(spill)时,会将部分数据从内存中写入磁盘以释内存间
// 为保证数据安全,当启用加密中间数据溢出时,最大ApplicationMaster(AM)尝试次数设置为1
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
// 使用命令行选项-libjars、-files、-archives配置用户的jobconf
// 并上载和配置与传递的作业相关的文件、libjar、jobjar和归档文件
copyAndConfigureFiles(job, submitJobDir);
// 获取作业conf文件,即:new Path(jobSubmitDir, "job.xml");
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
// 计算任务输入的分片,并返回分片数量,即map任务的数量
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// 如果计算出来的map数大于设置的或者默认的最大map数,抛出异常
int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
if (maxMaps >= 0 && maxMaps < maps) {
throw new IllegalArgumentException("The number of map tasks " + maps +
" exceeded limit " + maxMaps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
// 将“作业提交到什么队列”写入job文件
String queue = conf.get(MRJobConfig.QUEUE_NAME,
AccessControlList acl = submitClient.getQueueAdmins(queue);
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
// 在将jobconf复制到HDFS之前删除jobtoken引用,因为任务不需要此设置;
// 实际上它们可能会因此而中断,因为引用将指向不同的作业。
if (conf.getBoolean(
// Add HDFS tracking ids
// 添加DHFS tracking ids:跟踪标识符,该标识符可用于在多个客户端会话中关联令牌的使用情况
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.toArray(new String[trackingIds.size()]));
// Set reservation info if it exists
// reservationId是全局唯一作业的保留标识符,如果作业没有任何关联的保留,则为null
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
// Write job file to submit dir
// 将submitJobFile写到JobTracker的文件系统中去
writeConf(conf, submitJobFile);
// Now, actually submit the job (using the submit name)
printTokens(jobId, job.getCredentials());
// 正式提交Job到Yarn或者本地
status = submitClient.submitJob( // 具体见Hadoop-MapReduce源码解析》之三
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
// 返回Job提交后的状态
return status;
} else {
// 任务启动失败
throw new IOException("Could not launch job");
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
// 清空暂存目录
jtFs.delete(submitJobDir, true);