Spark2.4.0 DAG(DAGScheduler)源码分析

2019-03-20  本文已影响0人  井地儿

Spark的DAG(Directed Acyclic Graph)的生成实际上是Stage的划分,而Stage的划分依据是RDD的依赖关系。在程序提交后,Spark先将所有的RDD看作是一个Stage,然后从后向前回溯,窄依赖划分到同一个Stage,遇到宽依赖(ShuffleDependency)则划分一个新的Stage,如此便形成了DAG。
DAG的实现在org.apache.spark.scheduler.DAGScheduler中。

image.png

DAGScheduler源码解读

Stage的构建

spark DAG中stage的创建是通过getOrCreateParentStages方法实现的。
通过给定的RDD获取或创建父stages清单。首先通过getShuffleDependencies方法获取所有的宽依赖,然后遍历宽依赖,构建Stage,返回Stage集合。

getOrCreateParentStages

/**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

getShuffleDependencies

通过rdd获取所有父RDD的宽依赖。
算法实现上采用了数组栈(ArrayStack)来实现。

/**
   * Returns shuffle dependencies that are immediate parents of the given RDD.
   *
   * This function will not return more distant ancestors.  For example, if C has a shuffle
   * dependency on B which has a shuffle dependency on A:
   *
   * A <-- B <-- C
   *
   * calling this function with rdd C will only return the B <-- C dependency.
   *
   * This function is scheduler-visible for the purpose of unit testing.
   */
  private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    // 存放宽依赖
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    // 存放已经访问过的RDD
    val visited = new HashSet[RDD[_]]
    // 存放待访问的RDD,通过数组栈来实现
    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.push(dependency.rdd)
        }
      }
    }
    parents
  }

getOrCreateShuffleMapStage

获取或创建ShuffleMapStage,其中ShuffleMapStage是说所有的Stage都保存在私有属性shuffleIdToMapStage 集合(HashMap)中,我们也可以将shuffleIdToMapStage理解为宽依赖注册中心。
该方法先从宽依赖注册中心(shuffleIdToMapStage)集合中获取Stage,如果存在则直接返回已存在的Stage,如果不存在则创建新的Stage。

/**
   * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for
   * that dependency. Only includes stages that are part of currently running job (when the job(s)
   * that require the shuffle stage complete, the mapping will be removed, and the only record of
   * the shuffle data will be in the MapOutputTracker).
   */
  private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]

源码实现
从没有父宽依赖的Stage开始创建,然后对当前宽依赖创建Stage。

/**
   * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
   * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
   * addition to any missing ancestor shuffle map stages.
   */
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage

      case None =>
        // Create stages for all missing ancestor shuffle dependencies.
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle dependency.
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

getMissingAncestorShuffleDependencies

获得所有没有父宽依赖的宽依赖。
算法实现上依然采用数组栈(ArrayStack),将宽依赖注册到宽依赖注册中心(shuffleIdToMapStage)中。

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
  private def getMissingAncestorShuffleDependencies(
      rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
    val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        getShuffleDependencies(toVisit).foreach { shuffleDep =>
          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
            ancestors.push(shuffleDep)
            waitingForVisit.push(shuffleDep.rdd)
          } // Otherwise, the dependency and its ancestors have already been registered.
        }
      }
    }
    ancestors
  }

DAGScheduler的创建

DAGScheduler何时创建的呢?

DAGScheduler是在SparkContext中创建的。

// _dagScheduler 是SparkContext的私有属性
@volatile private var _dagScheduler: DAGScheduler = _
...
_dagScheduler = new DAGScheduler(this)
...

DAGScheduler何时调用的呢?

其实可以想到,谁创建谁调用。DAGScheduler的调用是在SparkContext的runJob方法中调用的。

/**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

DAGScheduler调用链

sc: SparkContext
ds: DAGScheduler
eventProcessLoop: DAGSchedulerEventProcessLoop
sc.runJob ----> ds.runJob ----> ds.submitJob ----> eventProcessLoop.post(JobSubmitted) ----> eventProcessLoop.onReceive ----> eventProcessLoop.doOnReceive ----> ds.handleJobSubmitted

handleJobSubmitted
这里才是重点,在handleJobSubmitted中,完成了ResultStage的创建,Job的创建,然后提交Stage(submitStage)。
下面是handleJobSubmitted的伪代码。

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
  
    // 创建输出结果Stage
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    ...
    // 下面省略了n行代码
    ...
    //创建活动的Job
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    ...
    // 下面省略了n行代码
    ...
    val jobSubmissionTime = clock.getTimeMillis()
    // 注册活动Job到map中(jobId,job)
    jobIdToActiveJob(jobId) = job
    // 注册Job到Set中
    activeJobs += job
    // 注册job到输出finalStage中
    finalStage.setActiveJob(job)
    // 获得当前job对应的所有stageId
    val stageIds = jobIdToStageIds(jobId).toArray
    // 获得所有的Stage信息(stageInfo)
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    // 监听通知
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    // 提交Stage
    submitStage(finalStage)
  }

submitStage(Stage提交)
先判断job有没有定义。如果job没有定义,则终止stage(调用abortStage方法)遗漏的Stage。
如果当前stage没有在等待中,执行中或失败的清单中,则继续提交。
判断有没有遗漏的父Stage。如果没有,则提交当前stage;如果有,则先提交父Stage,并将当前Stage添加到等待的stage集合中(waitingStages)。
伪代码如下:

  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    // 如果jobId已定义
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      // 如果不在等待中,运行中或失败中,则继续提交
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        // 查询没有父Stage的stage。这里sort排序之后,有序的执行
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        // 如果不存在,则提交当前stage;否则提交父stage,并将当前stage添加到等待stage集合中
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          // 当父stage全准备就绪了,此时就可以提交当前stage的task
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            // 递归提交stage
              submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      // 如果jobId没有定义,终止当前stage执行
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

submitMissingTasks

Called when stage's parents are available and we can now do its task.
当stage的父stage是可用的(也就是父stage运行成功),我们现在可以运行它的task的时候调用

这个方法是重点,难得在spark源码里遇到一个如此长的方法,可想而知提交task的复杂性。

 /** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")

    // 首先计算出要计算的分区ID的索引。
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    // 从当前stage关联的活动job中,获得使用的调度池,job分组,描述等
    val properties = jobIdToActiveJob(jobId).properties
    // 将当前stage添加到运行中的stage集合中
    runningStages += stage
    // 在测试任务是否可序列化之前,应发布SparkListenersTageSubmitted。如果任务不可序列化,则将发布SparkListenersTageCompleted事件,该事件应始终位于相应的SparkListenersTageSubmitted事件之后。
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    // task本地化
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    // 通过创建新的stageinfo和新的重试id来创建一个新的重试
    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    // 如果有需要执行的task,则记录stage提交时间。否则,发布没有提交时间的事件,来表明会跳过当前stage
    if (partitionsToCompute.nonEmpty) {
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // 或许我们可以保持task二进制在stage中来避免被多次序列化
    // 任务广播的二进制文件,用于将任务task分发给executors。注意,我们广播了RDD的序列化副本,对于每个任务,我们将对其进行反序列化,这意味着每个任务都获得了RDD的不同副本。这在可能修改闭包中引用的对象状态的任务之间提供了更强的隔离。这在Hadoop中是必需的,因为jobconf/configuration对象不是线程安全的。
    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {
      // 不同的task,序列化和广播采用不同的方法
      // 对于ShuffleMapTask,序列化和广播使用(rdd, shuffleDep)
      // 对于ResultTask,序列化和广播使用 (rdd, func)
      var taskBinaryBytes: Array[Byte] = null
      // TaskBinaryBytes和分区都受检查点状态的影响。我们需要这种同步,以防另一个并发作业检查这个RDD,所以我们得到两个变量的一致视图。
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }
      // task二进制字节长度大于告警值,则打印告警日志
      if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
        logWarning(s"Broadcasting large task binary with size " +
          s"${Utils.bytesToString(taskBinaryBytes.length)}")
      }

      // task分发,广播task二进制字节码
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
        runningStages -= stage

        // Abort execution
        return
      case e: Throwable =>
        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage

        // Abort execution
        return
    }
    // 清除pending的分区,重新更新task
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    // 如果有待执行的task,则提交task运行
    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // 因为我们早先已经发布了SparkListenerStageSubmitted,我们需要标记该stage已经完成,因为没有可运行的task
      markStageAsFinished(stage, None)

      stage match {
        case stage: ShuffleMapStage =>
          logDebug(s"Stage ${stage} is actually done; " +
              s"(available: ${stage.isAvailable}," +
              s"available outputs: ${stage.numAvailableOutputs}," +
              s"partitions: ${stage.numPartitions})")
          markMapStageJobsAsFinished(stage)
        case stage : ResultStage =>
          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
      }
      // 至此,当前stage的task已经全部运行完成,然后我们提交等待的子stage开始运行
      submitWaitingChildStages(stage)
    }
  }

总结

本文主要分析了Spark DAG原理,包括Stage的如何构建,什么时候调用。后面重点分析了DAGScheduler的调用链,这其中涉及到了提交一个job都经历了什么。涉及到job的构建,stage的提交,task的创建,task如何选择本地化的分区,task的序列化及广播分发到excutor等等。信息量有点儿大,建议这一块的代码多看几遍。

上一篇下一篇

猜你喜欢

热点阅读