Spark中Job的提交源码解读
版权声明:本文为原创文章,未经允许不得转载。
Spark程序程序job的运行是通过actions算子触发的,每一个action算子其实是一个runJob方法的运行,详见文章
SparkContex源码解读(一)http://www.jianshu.com/p/9e75c11a5081
1.Spark中Job的提交
以一个简单的runjob为例,源码如下:
<code>
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//通过dagScheduler运行job,即将JobSubmitted事件添加到DAGScheduler中的事件执行队列中,并用JobWaiter等待结果的返回
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)详见(1)
waiter.awaitResult() match {
case JobSucceeded =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}</code>
1.submitJob(rdd, func, partitions, callSite, resultHandler, properties)方法如下:
<code>
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
//如果job正在运行0个task,那么马上返回
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//将JobSubmitted事件添加到eventProcessLoop中执行,详见(2)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
</code>
2.将JobSubmitted事件添加到eventProcessLoop中执行 eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
其中,
(1)JobSubmitted一种DAGScheduler可以处理的事件类型,它的trait DAGSchedulerEvent的一个实现。DAGSchedulerEvent的case子类如下图所示:
(2)eventProcessLoop的类型是DAGSchedulerEventProcessLoop,它是抽象类EventLoop的子类,该类的源码如下:
<code>
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoopDAGSchedulerEvent with Logging {
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
//对于JobSubmitted,通过 dagScheduler.handleJobSubmitted方法处理
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, , , taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
</code>
3.对于JobSubmitted事件类型,通过 dagScheduler的handleJobSubmitted方法处理,这个方法中关系涉及到Job的Stage、TaskSet(Tasks)的生成,
<code>
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[],
func: (TaskContext, Iterator[]) => ,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
(1)//根据jobId生成finalStage,我们在后面具体介绍
(2)Job的提交
//初始化ActiveJob
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
//清除RDD的位置信息
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
...
(3)提交stages,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中
...
...
(4)提交Taskset(tasks)
...
}
</code>
由代码(2)处我们可以看到SparkListenerJobStart事件加入到了监听器总线LiveListenerBus中,它的父类SparkListenerBus中定义了具体事件及监听器的映射关系,如下所示:
<code>
private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
//Job的启动
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
}
}
}
</code>
4.SparkListenerJobStart 事件最后是由JobProgressListener监听器的onJobStart方法执行的,如下所示:
<code>
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobGroup = for (
props <- Option(jobStart.properties);
group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))//得到属性的值"spark.jobGroup.id"
) yield group
val jobData: JobUIData =
new JobUIData(
jobId = jobStart.jobId,
submissionTime = Option(jobStart.time).filter(_ >= 0),
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
// A null jobGroupId is used for jobs that are run without a job group
jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
//计算将要运行这个job的的tasks数量,这可能是一个低估因为job start event 引用所有的result stages's的依赖
jobData.numTasks = {
val allStages = jobStart.stageInfos
//过滤掉已经完成的或取消的Stage
val missingStages = allStages.filter(.completionTime.isEmpty)
missingStages.map(.numTasks).sum
}
//存放jobid以及相关的jobData
jobIdToData(jobStart.jobId) = jobData
//激活的、将要执行的Jobs
activeJobs(jobStart.jobId) = jobData
// 遍历stageIds,更新stageId为key,ActiveJobIds为value的stageIdToActiveJobIds集合
for (stageId <- jobStart.stageIds) {
stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId)
}
//遍历stageInfos
for (stageInfo <- jobStart.stageInfos) {
stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
}
}
</code>
这样我们就启动了Job,WebUI就可以看到该Job的信息了。