HadoopHadoop在简书Hadoop系

源码系列1.DAG之createResultStage

2018-09-01  本文已影响0人  老蒙

原创,转载请标明出处: https://www.jianshu.com/p/c39596da86bb

本文主要关于stage的划分,createResultStage方法里包含了整个的划分流程,代码包含大量的递归,第一次看看起来是比较恶心的,这里做一个整理记录和分享。

这篇文章的一切都是从这句代码开始:

finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

一、总览

图画得有点丑。。。。。见谅。。。。

DAG_STAGE.png
解释一下:
1. 传入rdd8 获得首个宽依赖3(getShuffleDependencies),传入宽依赖3调用getMissingAncestorShuffleDependencies获得3以前的所有的宽依赖关系(包括宽依赖1和2...),由于存放的结构是先进后出的stack,所以从宽依赖1开始遍历宽依赖3以前的每一个宽依赖关系,调用createShuffleMapStage
  1. createShuffleMapStage
      1)第一次调用createShuffleMapStage时传进来的是宽依赖1,没有父stage,调用getOrCreateParentStages返回空list。那么从1开始根据“宽依赖1”,“父stage(ShuffleMapStage(1)没有父stage)” 来 new ShuffleMapStage 获得ShuffleMapStage(1)添加进 shuffleIdToMapStage ;
      2)第二次调用createShuffleMapStage同样调用getOrCreateParentStages()传入rdd4,获得宽依赖1;getOrCreateShuffleMapStage传入宽依赖1,shuffleIdToMapStage.get(shuffleDep.shuffleId)直接获得ShuffleMapStage(1)。此时拿到了ShuffleMapStage(1),根据“宽依赖2”,“父stage:ShuffleMapStage(1)” 来 new ShuffleMapStage(2) 同样添加进 shuffleIdToMapStage。
    ............
    3. 遍历完宽依赖3以前的所有宽依赖后,再次调用createShuffleMapStage把宽依赖3传进去,同样很容易根据rdd6获得宽依赖2再获得ShuffleMapStage(2),返回ShuffleMapStage(3)。(此时ResultStage之前的所有stage都已经划分好并添加进shuffleIdToMapStage)
    4. 回到createResultStagegetOrCreateParentStages方法,获得了ShuffleMapStage(3),创建ResultStage:new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)。

总结:根据finalRdd容易得到宽依赖3,再根据宽依赖1得到所有宽依赖关系,放进stack遍历。拿出第一个宽依赖1(没有父stage)创建ShuffleMapStage1,添加进shuffleIdToMapStage维护宽依赖和stage的关系。拿出第二个宽依赖2推出宽依赖1后,直接从shuffleIdToMapStage拿到ShuffleMapStage1,创建ShuffleMapStage2添加进shuffleIdToMapStage。

注意
1.关键的map
stageIdToStage :自增id对应stage
shuffleIdToMapStage :shuffleId来自shuffleDep.shuffleId 维护宽依赖和stage的关系
2.dependency和rdd的关系
rdd.dependencies:通过rdd获得依赖关系
dependency.rdd:通过依赖关系获得rdd
3.创建ResultStage和ShuffleMapStage

  1. ResultStage:每个job只且只有一个,为action算子前的一个stage。(主要构成条件:父stage)
    new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  2. ShuffleMapStage:每个job有0+个。 (主要构成条件:stage(可能没有)和shuffleDep宽依赖(一定有))
    new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
    4.各个类的依赖关系
    1)createResultStage(传入finalRDD获得ResultStage) ->2
    2)getOrCreateParentStages(传入rdd获得父stage) ->3->4
     3)getShuffleDependencies(传入rdd获得宽依赖)
     4)getOrCreateShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->5->6
      5)getMissingAncestorShuffleDependencies(传入一个rdd获得所有宽依赖) ->3
      6)createShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->2

下面来看源码

1.createResultStage(line 354)

获得ResultStage

private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {
  val parents = getOrCreateParentStages(rdd, jobId)//获得父stage,若没有shuffle则返回空List
  val id = nextStageId.getAndIncrement() // 到这里时已经分好stage,这个id是ResultStage的id
   // 主要根据父stage获得ResultStage
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  stageIdToStage(id) = stage// 把ResultStage和它的ID加入stageIdToStage
  updateJobIdStageIdMaps(jobId, stage) // 更新jobIds和jobIdToStageIds
  stage// 返回这个ResultStage
}

2.getOrCreateParentStages(line 372)

获得父stage

private def getOrCreateParentStages(rdd: RDD[_],
    firstJobId: Int): List[Stage] = {
  // 遍历RDD的依赖关系,找到第一个ShuffleDependency(可能多个,也可能没有)然后放入HashSet并返回
  // 如果获取不到ShuffleDependency,逻辑在此终止返回空list
  getShuffleDependencies(rdd).map { shuffleDep =>
    // 里面会创建当前ShuffleDependency的所有父ShuffleMapStage
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  }.toList
}

3.getShuffleDependencies(line 414)

获得单个rdd的所有宽依赖关系(一般只有一个宽依赖;像CoGroupedRDD有多个)

private[scheduler] def getShuffleDependencies(rdd: RDD[_])
                : HashSet[ShuffleDependency[_, _, _]] = {
  // 用来存放ShuffleDependency的HashSet
  val parents = new HashSet[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]] // 遍历过的RDD
  val waitingForVisit = new Stack[RDD[_]]//后进先出的数据结构
  waitingForVisit.push(rdd)
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()// 取出顶部的第一个元素 RDD
    if (!visited(toVisit)) {// 若这个rdd没有被访问过
      visited += toVisit// 标记已访问
//rdd.dependencies获得当前这个rdd的所有依赖,这个依赖可能为一个或多个
//(一般只有一个宽依赖;像CoGroupedRDD有多个),返回的是Seq[Dependency[_]]序列类型
      toVisit.dependencies.foreach {
        case shuffleDep: ShuffleDependency[_, _, _] =>
          parents += shuffleDep
        case dependency =>// 若不是宽依赖就把这个RDD的父RDD push进waitingForVisit
          waitingForVisit.push(dependency.rdd)
      }
    }
  }
  parents//获得传进来的finalRDD的第一个ShuffleDependency并不是所有
}

4.getOrCreateShuffleMapStage(line 289)

根据宽依赖关系获得stage

//第一次来到这里传进来的是左到右首个shuffleDep,没有父stage,
//shuffleIdToMapStage也没有记录,会调getMissingAncestorShuffleDependencies并创建stage
//之后进来可直接根据shuffleIdToMapStage提取到ShuffleMapStage
private def getOrCreateShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],
            firstJobId: Int): ShuffleMapStage = {
  //  通过shuffleDep获得之前添加的ShuffleMapStage
  shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) =>
      stage
    case None =>
//获得所有父ShuffleDependencies遍历,先进后出,从首个宽依赖开始遍历直到构建所有父ShuffleMapStage
      getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
          createShuffleMapStage(dep, firstJobId)  //里面会递归调用,创建所有的ShuffleMapStage
        }
      }
      // 这个shuffleDep为finallRDD找到的首个shuffleDep,创建这个shuffleDep的ShuffleMapStage
      createShuffleMapStage(shuffleDep, firstJobId)
  }
}

5.getMissingAncestorShuffleDependencies(line 379)

获得所有rdd的所有宽依赖

private def getMissingAncestorShuffleDependencies(rdd: RDD[_])
                    : Stack[ShuffleDependency[_, _, _]] = {

  val ancestors = new Stack[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]]
  val waitingForVisit = new Stack[RDD[_]]
  waitingForVisit.push(rdd)
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    if (!visited(toVisit)) {
      visited += toVisit
      //getShuffleDependencies获得当前rdd的shuffleDependencies
      getShuffleDependencies(toVisit).foreach { shuffleDep =>
        if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
        // 若shuffleIdToMapStage不包含ShuffleDep的shuffleId,把这个ShuffleDep存进ancestors
          ancestors.push(shuffleDep)
          //把ShuffleDep的父RDD再传进waitingForVisit,准备再遍历
          waitingForVisit.push(shuffleDep.rdd)
        }
      }
    }
  }
  ancestors// 包含(除finalRDD的第一个ShuffleDep以外)所有的ShuffleDep(可能为空)
}

6.createShuffleMapStage(line 319)

获得ShuffleMapStage 添加进stageIdToStage

def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],
            jobId: Int): ShuffleMapStage = {

  val rdd = shuffleDep.rdd//父RDD
  val numTasks = rdd.partitions.length// 分区数
   //获得父stage,第一次到这里时返回空list,没有父stage
  val parents = getOrCreateParentStages(rdd, jobId)
  //当没有parents时当前的stage为第一个stage,第一次调用到这里,StageId编号1
  val id = nextStageId.getAndIncrement()
  val stage =
        new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)

  stageIdToStage(id) = stage// 自增id和stage
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage //  维护宽依赖和stage的关系
  updateJobIdStageIdMaps(jobId, stage)// 更新jobIds和jobIdToStageIds
  // 把shuffle信息注册到Driver上的MapOutputTrackerMaster的shuffleStatuses
  if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
        }
      }
     } else {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    // 把Shuffle信息注册到自己Driver的MapOutputTrackerMaster上,
    // 生成的是shuffleId和ShuffleStatus的映射关系
    // 在后面提交Job的时候还会根据它来的验证map stage是否已经准备好
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  }
  stage // 返回生成的ShuffleMapStage
}

公众号:大叔据 。

评论不能及时回复可直接加公众号提问或交流,知无不答,谢谢 。

上一篇下一篇

猜你喜欢

热点阅读