通过MapReduce的本地运行方式来分析MapReduce的执
2019-03-05 本文已影响0人
苏坡闷
一、首先抛出结论(Map阶段)
1. Job.afterCompletion():
检查是否是running状态,如果是running避免重复提交!
如果状态是define,提交!执行commit()
2. commit() :
创建Cluster对象,是Job运行的集群的抽象表达,包含JobRunner,及文件系统!
根据Cluster获取Jobcommitter,提交Job
3. 提交前:
①确定当前Job的作业目录
②切片,将split.info / split.infometa在作业目录生成
③根据切片数,设置MapTask启动个数
④生成Job.xml文件(包含了所有的配置信息参数)
4. 提交:
在LocalJobRunner上重构Job对象!
执行start(),启动一个线程!
5. Job的run()
①根据切片信息,获取包含切面及属性信息的数组,根据这个数组,确定List<RunableandThrowable> mapTaskRunables
②根据设置的numReduceTasks(默认为1),确定List<RunableandThrowable> reduceeTaskRunables
③创建线程池,开启多个线程,运行MapTask和ReduceTask
6. 每个MapTaskRunable对象,都会创建一个MapTask
MapTask执行runNewMapper()执行Map阶段的核心逻辑!
7. 每个ReduceTaskRunable对象,都会创建一个ReduceTask
ReduceTask执行runNewReducer()执行Rudece阶段的核心逻辑!
二、各阶段源码
- Job.waitForCompletion
public boolean waitForCompletion(boolean verbose) throws IOException,
InterruptedException, ClassNotFoundException {
// 判断job是定义状态,避免job提交后正在运行造成的重复提交
if (state == JobState.DEFINE) {
//此方法的核心在于submit()
submit();
}
if (verbose) { // 根据传入参数决定是否将job运行的过程打印显示
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
- submit()
2.1 submit方法
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
//确认运行状态
ensureState(JobState.DEFINE);
//确定是否是使用的新API
setUseNewAPI();
// 创建Cluster对象,Cluster是集群的抽象表达,包含JobRunner,和文件系统
connect();
// 根据Cluster创建Job提交器
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),cluster.getClient());
// 使用Job提交器,提交Job,获取Job运行状态
//ugi是UserGroupInformation类的实例,表示Hadoop中的用户和组信息,这个类包装了一个JAAS Subject以及提供了
//方法来确定用户的名字和组,它同时支持Windows、Unix和Kerberos登录模块
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
// 将状态改为正在运行
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
2.2. setUseNewApi()
private void setUseNewAPI() throws IOException {
// reduceTask的个数,通过mapreduce.job.reduces设置,默认是1
int numReduces = conf.getNumReduceTasks();
......//后面是一系列的检查语句,此处不作具体解释
}
- Jobsubmitter提交Job
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
// 获取输出格式,调用方法,检查输入目录是否设置且不存在!
checkSpecs(job);
Configuration conf = job.getConfiguration();
//将conf加入分布式缓存中
addMRFrameworkToDistributedCache(conf);
// 生成Job运行期间临时作业目录
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
//得到本机的提交地址
submitHostAddress = ip.getHostAddress();
//得到本机的主机名字
submitHostName = ip.getHostName();
//获取之后,在配置文件中进行submitHostName和submitHostAddress的设置
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
// 生成Job的id
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 根据Jobid,在Job作业目录,创建一个子目录
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try{
......//此段内容为权限检查
// 生成当前Job的作业目录
copyAndConfigureFiles(job, submitJobDir);
// 获取当前Job总的配置文件Job.xml(包含8个配置文件中所有的信息)的路径
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 生成当前Job的切片信息: Job.split(切片信息),job.splitinfo(切片的属性)
// Job.split保存了有多少个切片对象,以及每个切片是从哪个文件切的哪部分
// job.splitinfo记录的是每一个切片,应该去哪个主机来读取,块信息的DN主机
int maps = writeSplits(job, submitJobDir);
// 根据切片数,设置应该启动几个MapTask
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
......//此段内容为权限检查
// Write job file to submit dir
// 将当前Job的配置信息生成到作业目录的job.xml中
writeConf(conf, submitJobFile);
// Now, actually submit the job (using the submit name)
printTokens(jobId, job.getCredentials());
// 正式提交
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
}finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
4.1 提交Job
submitJob()方法是接口 ClientProtocol(RPC 协议)中的一个抽象方法。根据 RPC 原理,在【客户端代理对象submitClient】调用RPC协议中的submitJob()方法,此方法一定在服务端执行。该方法也有两种实现: LocalJobRunner(本地模式)和 YARNRunner(YARN模式)
//本地模式的Job提交方式,
public org.apache.hadoop.mapreduce.JobStatus submitJob(
org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
Credentials credentials) throws IOException {
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir); //跳至第4.2阶段
job.job.setCredentials(credentials);
return job.status;
}
4.2 New Job
//重构Job对象!
public Job(JobID jobid, String jobSubmitDir) throws IOException {
…… //各种设置,重构在LocalJobRunner上运行的Job
// 开启一个线程
this.start();
}
- Job.run()
//运行Job
public void run() {
JobID jobId = profile.getJobID();
JobContext jContext = new JobContextImpl(job, jobId);
org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
try {
outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
} catch (Exception e) {
LOG.info("Failed to createOutputCommitter", e);
return;
}
try {
// 根据切片信息,创建所有的切片及切片属性对象
TaskSplitMetaInfo[] taskSplitMetaInfos =
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
int numReduceTasks = job.getNumReduceTasks();
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
// 使用一个Map记录每个MapTask最终保存文件的信息
Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
//创建MapTask运行的线程集合,有几片就启动几个线程
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);
initCounters(mapRunnables.size(), numReduceTasks);
// 创建线程池
ExecutorService mapService = createMapExecutor();
// 开启Map阶段线程的运行
//注意:mapreduce的运行过程中,使用了线程池的技术(放到队列当中,在将来的某个时刻进行执行)
runTasks(mapRunnables, mapService, "map");
try {
if (numReduceTasks > 0) {
//计算reduce对应的runnable个数
List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
jobId, mapOutputFiles);
//开启线程池
ExecutorService reduceService = createReduceExecutor();
//开启Reduce阶段线程的运行,此篇文章不讲解此阶段,后面新开文章讲解此阶段内容
runTasks(reduceRunnables, reduceService, "reduce");
}
} finally {
for (MapOutputFile output : mapOutputFiles.values()) {
output.removeAll();
}
}
// delete the temporary directory in output directory
outputCommitter.commitJob(jContext);
status.setCleanupProgress(1.0f);
if (killed) {
this.status.setRunState(JobStatus.KILLED);
} else {
this.status.setRunState(JobStatus.SUCCEEDED);
}
JobEndNotifier.localRunnerNotification(job, status);
} catch (Throwable t) {
try {
outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
} catch (IOException ioe) {
LOG.info("Error cleaning up job:" + id);
}
status.setCleanupProgress(1.0f);
if (killed) {
this.status.setRunState(JobStatus.KILLED);
} else {
this.status.setRunState(JobStatus.FAILED);
}
LOG.warn(id, t);
JobEndNotifier.localRunnerNotification(job, status);
} finally {
try {
fs.delete(systemJobFile.getParent(), true); // delete submit dir
localFs.delete(localJobFile, true); // delete local copy
// Cleanup distributed cache
localDistributedCacheManager.close();
} catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e);
}
}
}
6.Job类中的runTasks()方法
private void runTasks(List<RunnableWithThrowable> runnables,
ExecutorService service, String taskType) throws Exception {
// Start populating the executor with work units.
// They may begin running immediately (in other threads).
for (Runnable r : runnables) {
//进行提交 是一个线程池,执行map或者reduce
service.submit(r);
}
...
}
- MapTaskRunable的run()
public void run() {
try {
// 生成当前线程的id
TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, taskId), 0);
LOG.info("Starting task: " + mapId);
mapIds.add(mapId);
// 创建MapTask对象,这个对象负责当前线程(节点)map阶段逻辑的执行!
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,info.getSplitIndex(), 1);
map.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
//设置目录
//map为MapTask类型的一个值,例如本次调试中所获取的值为:attempt_localXXX_0001_m_000000_0
//localConf()加载配置文件的信息
setupChildMapredLocalDirs(map, localConf);
//创建一个map输出文件并设置配置信息
// 当前MapTask生成的数据保存的对象
MapOutputFile mapOutput = new MROutputFiles();
mapOutput.setConf(localConf);
mapOutputFiles.put(mapId, mapOutput);
//指的是一个job_localXXX_0001.xml文件
map.setJobFile(localJobFile.toString());
localConf.setUser(map.getUser());
map.localizeConfiguration(localConf);
map.setConf(localConf);
try {
map_tasks.getAndIncrement();
//launchMap()方法,进行启动map
myMetrics.launchMap(mapId);
map.run(localConf, Job.this); //此处将调用Maptask的run方法
myMetrics.completeMap(mapId);
} finally {
map_tasks.getAndDecrement();
}
LOG.info("Finishing task: " + mapId);
} catch (Throwable e) {
this.storedException = e;
}
}
- MapTask.run()
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// 如果需要对key-value进行排序,那么必须有reduce阶段!
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
if (useNewApi) {
//此处进入Map的核心处理阶段的入口
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
9*. MapTask核心逻辑 RunNewMapper()
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
// 创建配置的上下文
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper 实例化Mapper对象
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format 实例化InputFormat
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split //重建切片 切片包含了当前片所属的文件及哪个部分
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
// 负责直接将MapTask输出的结果输出到最终的输出目录
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
// 负责将MapTask产生的结果进行收集,交给ReduceTask
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext);
//进入之后运行用户自定义的mapper示例
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
10.Mapper.run()
//运行自定义的Mapper!
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
//在此处开始运行自己写的mapper程序
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
参考文献:
MapReduce Job本地提交过程源码跟踪及分析https://blog.csdn.net/lemonZhaoTao/article/details/72943618