Spark - DAGScheduler

2017-11-13  本文已影响19人  空即是色即是色即是空

在Spark中有几个重要概念:

我们从以下简单的一行代码入手,来看spark中的各个术语的含义。

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

其中,sc代表SparkContext,它通过一些default的SparkConfig构建出来。这一行代码就算是一个Application。sc通过textFile, filter操作RDDs,最后count这个Action触发一个job。


整体上描述一下spark的运行:Application会运行在driver上,driver会根据代码中的action创建并提交job(runJob/submitJob)。然后从job的最后一个RDD朝前演算,遇到一个宽依赖就创建一个stage。最后以stage为单位创建task集合,并在excutor中执行每项task

spark拆分任务的流程图如下:

runJob flow.png

涉及到的几个class:
SparkContext, DAGScheduler, DAGSchedulerEventProcessLoop, TaskScheduler

几个class的相互关系

  1. SparkContext中初始化DAGScheduler, TaskScheduler
  2. DAGScheduler中初始化DAGSchedulerEventProcessLoop(eventProcessLoop)
  3. DAGScheduler的构造函数参数中包含TaskScheduler

流程介绍

整个过程就是将RDD DAG按照宽窄依赖切分成Stage DAG:

 private def getMissingAncestorShuffleDependencies(
      rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    val ancestors = new Stack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        getShuffleDependencies(toVisit).foreach { shuffleDep =>
          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
            ancestors.push(shuffleDep)    //广度遍历
            waitingForVisit.push(shuffleDep.rdd)
          } // Otherwise, the dependency and its ancestors have already been registered.
        }
      }
    }
    ancestors
  }

参考: http://www.cnblogs.com/xing901022/p/6674966.html

stage拆分的整个函数调用过程如下:

stage creation flow.jpeg

举例说明:
如下图,spark job依赖关系:

job.jpg

上图抽象如下:

[E] <--------------
                    \
[C] <------[D]------[F]--(s_F)----
                                   \
[A] <-----(s_A)----- [B] <-------- [G]

Note: [] means an RDD, () means a shuffle dependency.

结果解析


至此,Stage都建立起来之后,就要开始执行各个stage

submitStage -> getMissingParentStages -> submitMissingTasks -> submitStage

整体上来讲:

  • 在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖和窄依赖两种
  • 如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
  • 提交task是调用函数submitMissingTasks来完成
    -当前stage执行完毕之后,再调用函数submitStage来执行child stage

TaskScheduler在SparkContext初始化期间就会初始化并且start,其backend会根据deploy mode作相应调整

submitMissingTasks -> taskScheduler(TaskSchedulerImpl).submitTasks -> backend.reviveOffers -> executor.launchTask -> threadPool.execute

private def submitMissingTasks(stage: Stage, jobId: Int) {
        ......
        ......
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)

      submitWaitingChildStages(stage)
    }

注意:

  1. stageID从1开始,按照"爷->父->子->孙"一次递增1
private val nextStageId = new AtomicInteger(0)
val id = nextStageId.getAndIncrement()
  1. 每个job只有一个ResultStage,其余的都是ShuffleMapStage
  2. 每个Stage的实例中,都包含一个parents的属性,这样就可以透过"孙"stage朝前找到所有的"祖先"stage
上一篇 下一篇

猜你喜欢

热点阅读