spark

stage划分-源码分析

2021-09-20  本文已影响0人  专职掏大粪
def collect(): Array[T] = withScope {
 //这里的this是当前rdd(调用action算子的rdd),后面会有传递
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}
//这里的this是当前rdd(调用action算子的rdd),后面会有传递
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

eventProcessLoop POST JobSubmitted 事件

 eventProcessLoop.post(JobSubmitted(
//这里的this是当前rdd(调用action算子的rdd),后面会有传递
      jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
      clonedProperties))
    listener.awaitResult()

eventThread 消费事件进行处理

private[spark] val eventThread = new Thread(name) {
  override def run(): Unit = {
    try {
      while (!stopped.get) {
        val event = eventQueue.take()
        try {
          onReceive(event)
     ... ...
    }
  }

}

doOnReceive

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

DAGScheduler.handleJobSubmitted 核心代码

 private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties): Unit = {
    var finalStage: ResultStage = null
    try {
      //创建ResultStage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

DAGScheduler.createResultStage

private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (shuffleDeps, resourceProfiles) = 
//获取rdd 的ShuffleDependencies
getShuffleDependenciesAndResourceProfiles(rdd)
  ... ...
   //创建parent stage
    val parents = getOrCreateParentStages(shuffleDeps, jobId)
    val id = nextStageId.getAndIncrement()
  //创建ResultStage,//这里的rdd(调用action算子的rdd)
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
      callSite, resourceProfile.id)
 ... ...
    stage
  }
private[scheduler] def getShuffleDependenciesAndResourceProfiles(
     rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
   val parents = new HashSet[ShuffleDependency[_, _, _]]
   val resourceProfiles = new HashSet[ResourceProfile]
   val visited = new HashSet[RDD[_]]
   val waitingForVisit = new ListBuffer[RDD[_]]
   waitingForVisit += rdd
   while (waitingForVisit.nonEmpty) {
     val toVisit = waitingForVisit.remove(0)
     if (!visited(toVisit)) {
       visited += toVisit
       Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
       toVisit.dependencies.foreach {
          //如果rdd的Dependency 是ShuffleDependency类型就放入Dependencies返回
         case shuffleDep: ShuffleDependency[_, _, _] =>
           parents += shuffleDep
         case dependency =>
           waitingForVisit.prepend(dependency.rdd)
       }
     }
   }
   (parents, resourceProfiles)
 }

DAGScheduler.getOrCreateParentStages

private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]],
      firstJobId: Int): List[Stage] = {
    //遍历shuffleDeps 对每一个shuffleDep创建ShuffleMapStage
    .map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

DAGScheduler.createShuffleMapStage

  def createShuffleMapStage[K, V, C](
      shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
   ... ...
    val numTasks = rdd.partitions.length
   //创建依赖的parent stage
    val parents = getOrCreateParentStages(shuffleDeps, jobId)
    val id = nextStageId.getAndIncrement()
    //穿建mapstage
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker,
      resourceProfile.id)
    stageIdToStage(id) = stage
... ...
    }
    stage
  }
上一篇 下一篇

猜你喜欢

热点阅读