Spark 的作业和任务调度系统是其核心,它能够有效进行调度的原因是对任务划分 DAG 和容错。

想真正搞懂 Spark,查看 Spark 作业执行流程的源码必不可少。本篇主要介绍 Spark 作业执行原理--提交作业。


Spark 算子有 transformation 和 action 两种,transformation 算子是懒操作,不会触发提交作业,action 算子才会触发作业的提交。

以 count 算子为例,在 RDD 的 count 方法中触发 SparkContext 的 runJob 来提交作业,这个提交是在 runJob 方法内部进行的,用户不用显式提交。

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

SparkContext 的 runJob 经过多次调用,会触发 DAGScheduler 的 runJob:

   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   * @throws Exception when the job fails
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
    // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
    // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
    // safe to pass in null here. For more detail, see SPARK-13747.
    val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception

这一段代码主要关注 submitJob,顾名思义,该方法用于提交 job:

   * Submit an action job to the scheduler.
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   * @return a JobWaiter object that can be used to block until the job finishes executing
   *         or can be used to cancel the job.
   * @throws IllegalArgumentException when partitions ids are illegal
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
      jobId, rdd, func2, partitions.toArray, callSite, waiter,

在 DAGScheduler #submitJob 中,先生成一个 JobWaiter,该 JobWaiter 等待 DAGScheduler job 完成,当任务执行完毕,该 JobWaiter 会将结果传递给给定的 handle function。

生成 JobWaiter 后,将它传递给 DAGScheduler 的内部类 DAGSchedulerEventProcessLoop 进行处理。这是典型的生产消费者模式,生产者将 JobSubmitted 作业提交的消息放入队列,然后另外的线程从队列中取出消息进行消费。

  private val eventThread = new Thread(name) {

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
          } catch {
            case NonFatal(e) =>
              try {
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)


如上 EventLoop 中的消费线程,它循环从队列中取出消息,交给 onReceive 方法消费, onReceive 在 DAGSchedulerEventProcessLoop 中的实现如下:

   * The main event loop of the DAG scheduler.
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
    } finally {

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId) =>

    case JobCancelled(jobId) =>

    case JobGroupCancelled(groupId) =>

    case AllJobsCancelled =>

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val filesLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      dagScheduler.handleExecutorLost(execId, filesLost)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>

    case completion: CompletionEvent =>

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>

对于作业提交的消息,匹配到 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) 方法:

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

handleJobSubmitted 方法中将进行 stage 的划分,并且提交调度阶段,将在后面的章节介绍。


