MapReduce源码分析(一)作业提交流程
前言
Mapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop的数据分析 应用”的核心框架。Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 hadoop 集群上。
执行流程图一.waitForCompletion
在mapreduce程序的job类中,我们通过set Configuration对象,得到相应的job对象,在job对象中指定Mapper类、Reducer类,Job类等属性后,通过waitForCompletion(true)方法提交并等待job执行。传入的boolean类型参数决定是否监控并打印job的执行情况。
public class MyJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length!=2){
System.out.println("请传入参数");
System.exit(1);
}
String inputPath = args[0];
String outputPath = args[1];
if (inputPath == null || inputPath == "" || outputPath == null || outputPath == ""){
System.out.println("参数有误");
System.exit(1);
}
Configuration conf = new Configuration(true);
Path out = new Path(outputPath);
if (out.getFileSystem(conf).exists(out)){
System.out.println("HDFS输出目录已存在");
System.exit(1);
}
//构建job类
Job job = Job.getInstance(conf);
//设置运行主类
job.setJarByClass(MyJob.class);
//作业名称
job.setJobName("job");
//设置输入输出路径
FileInputFormat.addInputPath(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,out);
//先是format
//job.setInputFormatClass();
//mapTask
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(IntWritable.class);
//然后是排序比较器
job.setSortComparatorClass(TQSortComparator.class);
//partition分组规则
job.setPartitionerClass(TQPartitioner.class);
//分组比较器
job.setGroupingComparatorClass(TQGroupComparator.class);
//reduceTask
job.setReducerClass(MyReduce.class);
//设置ReduceTask数量
job.setNumReduceTasks(2);
//提交任务完成
job.waitForCompletion(true);
}
}
现在我们进入Job类中的waitForCompletion()方法查看,该方法传入一个布尔值参数。方法首先检查Job状态,若处于DEFINE状态则通过submit()方法提交job。而后根据传入的参数决定是否监控并打印job的运行状况。
该方法每隔 1 秒轮询作业的进度,如果进度有所变化,将该进度报告给控制台(console)。当作业成功完成,作业计数器被显示出来。否则,导致作业失败的错误被记录到控制台。
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//首先检查Job状态,若处于DEFINE状态则通过submit()方法向集群提交job
if (state == JobState.DEFINE) {
submit();
}
//若传入参数为true,则监控并打印job运行情况
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
二.submit()方法
waitForCompletion内部主要处理任务是调用了submit方法,接下来我们关注核心点submit方法。
该方法负责向集群提交job,方法首先再次检查job的状态,如果不是DEFINE则不能提交作业,setUseNewAPI()方法作用是指定job使用的是新版mapreduce的API,即org.apache.hadoop.mapreduce包下的Mapper和Reducer,而不是老版的mapred包下的类。
submit()中执行了两个比较重要的方法:
其一,connect()方法会对Job类中的Cluster类型的成员进行初始化,该成员对象中封装了通过Configuration设置的集群的信息,其内部创建了真正的通信协议对象,它将用于最终的job提交。
其二,getJobSubmitter()方法通过cluster中封装的集群信息(这里是文件系统和客户端)获取JobSubmitter对象,该对象负责最终向集群提交job并返回job的运行进度。最后job提交器对象submitter.submitJobInternal(Job.this, cluster)将当前job对象提交到cluster中,并返回job运行状态给status成员,该方法是JobSubmitter中最核心的功能代码。提交成功后,JobState被设置为RUNNING,表示当前job进入运行阶段,最后控制台中打印跟踪job运行状况的URL。
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
//通过cluster中封装的集群信息(这里是文件系统和客户端)获取JobSubmitter对象,该对象负责最终向集群提交job并返回job的运行进度
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
三.submitJobInternal方法
可以看到在 Job 对象上面调用 submit() 方法之后,在内部创建一个 JobSubmitter 实例,然后调用该实例的 submitJobInternal() 方法。
任务提交器(JobSubmitter)是最终提交任务到集群的方法。
submitJobInternal的执行过程如下:
1.首先checkSpecs(job)方法检查作业输出路径是否配置并且是否存在。正确情况是已经配置且不存在,输出路径的配置参数为mapreduce.output.fileoutputformat.outputdir
2.而后获取job中封装的Configuration对象,添加MAPREDUCE_APPLICATION_FRAMEWORK_PATH(应用框架路径)到分布式缓存中。
3.通过JobSubmissionFiles中的静态方法getStagingDir()获取作业执行时相关资源的存放路径。默认路径是: /tmp/hadoop-yarn/staging/root/.staging
4.获取提交任务的当前主机的IP,并将ip、主机名等相关信息封装进Configuration对象中。
5.生成JobID并将其设置进job对象中,构造提交job的路径。然后是对该路径设置一系列权限的操作。
6.copyAndConfigureFiles,拷贝作业运行必备的资源,作业 JAR 文件,作业 JAR 文件以一个高副本因子(a high replication factor)进行拷贝(由 mapreduce.client.submit.file.replication 属性控制,默认值为 10),所以在作业任务运行时,在集群中有很多的作业 JAR 副本供节点管理器来访问。
7.调用writeSplits()方法,(非常重要)为作业计算输入分片(input splits)。写分片数据文件job.splits和分片元数据文件job.splitmetainfo,计算map任务数。
8.writeConf()方法,写 xml 配置文件
9.提交作业submitClient.submitJob,通过在资源管理器上调用 submitApplication 来提交作业。
源码如下
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
int keyLen = CryptoUtils.isShuffleEncrypted(conf)
? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
: SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(keyLen);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
四.writeSplits 方法
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
使用newAPI将会调用writeNewSplits()方法
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//如果使用newAPI则调用writeNewSplits
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
writeNewSplits()方法将会根据我们设置的inputFormat.class通过反射获得inputFormat对象input,然后调用inputFormat对象的getSplits方法,当获得分片信息之后调用JobSplitWriter.createSplitFiles方法将分片的信息写入到submitJobDir/job.split文件中。
反射获取InputFormat 格式化的规则,如果用户不指定,默认TextInputFormat。
五.FileInputFormat类中的getSplits()方法
这里我们需要注意这一代码:
....
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
....
if (isSplitable(job, path)) {
//获取块的大小
long blockSize = file.getBlockSize();
//计算切片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//分块中剩余的字节大小
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
....
}
这里的之前代码首先获取long minSize
和long maxSize
,然后计算切片大小。
计算规则如下:
computeSplitSize >>> Math.max(minSize, Math.min(maxSize, blockSize));
如果想要修改切片的最大值就修改minSize,最小值就修改MaxSize,反着来就行。
这里的while循环中的判定语句作用是判断分块中剩余的字节大小与预设分片大小的比例是否超过某个限定值SPLIT_SLOP,该值是一个常量,为1.1,在FileInputFormat类中定义。也就是说当剩余字节大于预设分片大小的110%后,对剩余的文件继续分片,否则不足110%,直接将剩余文件生成一个分片。
private static final double SPLIT_SLOP = 1.1;
之后生成切片信息,放入到切片集合中。
这里注意切片的5个组成部分
- Path
- 起始偏移量
- 切片大小
- 切片的存储的host地址
- 切片的副本的host地址
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
回到JobSubmitter的writeNewSplits方法中,这里对切片进行排序,根据大小将拆分排序,以便最大的拆分优先
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
之后调用createSplitFiles,根据切片清单创建元数据信息文件,主要是将数据写入到job.split和job.splitmetainfo文件中。由createSplitFiles函数完成核心功能。
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
Configuration conf, FileSystem fs, T[] splits)
throws IOException, InterruptedException {
//创建job.split文件,并以流的方式打开该文件
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
//将所有分片(InputSplit的实例)的信息都写入job.split文件中
//同时会返回各个分片的原数据信息,放入info数组中
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
out.close();
//将info中的split元数据信息写入到job.splitmetainfo文件中
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
info);
}
六.submitJob真正的提交作业
submitClient.submitJob,会调用YARN的submitJob方法。构造开始一个MapReduce的ApplicationMaster的必要信息。
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
调用submitApplication(appContext),提交到ResourceManager。
下一篇文章会对作业的初始化源码进行分析。敬请关注