源码系列1.DAG之createResultStage
原创,转载请标明出处: https://www.jianshu.com/p/c39596da86bb
本文主要关于stage的划分,createResultStage方法里包含了整个的划分流程,代码包含大量的递归,第一次看看起来是比较恶心的,这里做一个整理记录和分享。
这篇文章的一切都是从这句代码开始:
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
一、总览
图画得有点丑。。。。。见谅。。。。
解释一下:
1. 传入rdd8 获得首个宽依赖3(getShuffleDependencies),传入宽依赖3调用getMissingAncestorShuffleDependencies获得3以前的所有的宽依赖关系(包括宽依赖1和2...),由于存放的结构是先进后出的stack,所以从宽依赖1开始遍历宽依赖3以前的每一个宽依赖关系,调用createShuffleMapStage。
-
createShuffleMapStage
1)第一次调用createShuffleMapStage时传进来的是宽依赖1,没有父stage,调用getOrCreateParentStages返回空list。那么从1开始根据“宽依赖1”,“父stage(ShuffleMapStage(1)没有父stage)” 来 new ShuffleMapStage 获得ShuffleMapStage(1)添加进 shuffleIdToMapStage ;
2)第二次调用createShuffleMapStage同样调用getOrCreateParentStages()传入rdd4,获得宽依赖1;getOrCreateShuffleMapStage传入宽依赖1,shuffleIdToMapStage.get(shuffleDep.shuffleId)直接获得ShuffleMapStage(1)。此时拿到了ShuffleMapStage(1),根据“宽依赖2”,“父stage:ShuffleMapStage(1)” 来 new ShuffleMapStage(2) 同样添加进 shuffleIdToMapStage。
............
3. 遍历完宽依赖3以前的所有宽依赖后,再次调用createShuffleMapStage把宽依赖3传进去,同样很容易根据rdd6获得宽依赖2再获得ShuffleMapStage(2),返回ShuffleMapStage(3)。(此时ResultStage之前的所有stage都已经划分好并添加进shuffleIdToMapStage)
4. 回到createResultStage的getOrCreateParentStages方法,获得了ShuffleMapStage(3),创建ResultStage:new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)。
总结:根据finalRdd容易得到宽依赖3,再根据宽依赖1得到所有宽依赖关系,放进stack遍历。拿出第一个宽依赖1(没有父stage)创建ShuffleMapStage1,添加进shuffleIdToMapStage维护宽依赖和stage的关系。拿出第二个宽依赖2推出宽依赖1后,直接从shuffleIdToMapStage拿到ShuffleMapStage1,创建ShuffleMapStage2添加进shuffleIdToMapStage。
注意:
1.关键的map
stageIdToStage :自增id对应stage
shuffleIdToMapStage :shuffleId来自shuffleDep.shuffleId 维护宽依赖和stage的关系
2.dependency和rdd的关系
rdd.dependencies:通过rdd获得依赖关系
dependency.rdd:通过依赖关系获得rdd
3.创建ResultStage和ShuffleMapStage
-
ResultStage:每个job只且只有一个,为action算子前的一个stage。(主要构成条件:父stage)
new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) -
ShuffleMapStage:每个job有0+个。 (主要构成条件:stage(可能没有)和shuffleDep宽依赖(一定有))
new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
4.各个类的依赖关系
1)createResultStage(传入finalRDD获得ResultStage) ->2
2)getOrCreateParentStages(传入rdd获得父stage) ->3->4
3)getShuffleDependencies(传入rdd获得宽依赖)
4)getOrCreateShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->5->6
5)getMissingAncestorShuffleDependencies(传入一个rdd获得所有宽依赖) ->3
6)createShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->2
下面来看源码
1.createResultStage(line 354)
获得ResultStage
private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)//获得父stage,若没有shuffle则返回空List
val id = nextStageId.getAndIncrement() // 到这里时已经分好stage,这个id是ResultStage的id
// 主要根据父stage获得ResultStage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage// 把ResultStage和它的ID加入stageIdToStage
updateJobIdStageIdMaps(jobId, stage) // 更新jobIds和jobIdToStageIds
stage// 返回这个ResultStage
}
2.getOrCreateParentStages(line 372)
获得父stage
private def getOrCreateParentStages(rdd: RDD[_],
firstJobId: Int): List[Stage] = {
// 遍历RDD的依赖关系,找到第一个ShuffleDependency(可能多个,也可能没有)然后放入HashSet并返回
// 如果获取不到ShuffleDependency,逻辑在此终止返回空list
getShuffleDependencies(rdd).map { shuffleDep =>
// 里面会创建当前ShuffleDependency的所有父ShuffleMapStage
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
3.getShuffleDependencies(line 414)
获得单个rdd的所有宽依赖关系(一般只有一个宽依赖;像CoGroupedRDD有多个)
private[scheduler] def getShuffleDependencies(rdd: RDD[_])
: HashSet[ShuffleDependency[_, _, _]] = {
// 用来存放ShuffleDependency的HashSet
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]] // 遍历过的RDD
val waitingForVisit = new Stack[RDD[_]]//后进先出的数据结构
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()// 取出顶部的第一个元素 RDD
if (!visited(toVisit)) {// 若这个rdd没有被访问过
visited += toVisit// 标记已访问
//rdd.dependencies获得当前这个rdd的所有依赖,这个依赖可能为一个或多个
//(一般只有一个宽依赖;像CoGroupedRDD有多个),返回的是Seq[Dependency[_]]序列类型
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>// 若不是宽依赖就把这个RDD的父RDD push进waitingForVisit
waitingForVisit.push(dependency.rdd)
}
}
}
parents//获得传进来的finalRDD的第一个ShuffleDependency并不是所有
}
4.getOrCreateShuffleMapStage(line 289)
根据宽依赖关系获得stage
//第一次来到这里传进来的是左到右首个shuffleDep,没有父stage,
//shuffleIdToMapStage也没有记录,会调getMissingAncestorShuffleDependencies并创建stage
//之后进来可直接根据shuffleIdToMapStage提取到ShuffleMapStage
private def getOrCreateShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
// 通过shuffleDep获得之前添加的ShuffleMapStage
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
//获得所有父ShuffleDependencies遍历,先进后出,从首个宽依赖开始遍历直到构建所有父ShuffleMapStage
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId) //里面会递归调用,创建所有的ShuffleMapStage
}
}
// 这个shuffleDep为finallRDD找到的首个shuffleDep,创建这个shuffleDep的ShuffleMapStage
createShuffleMapStage(shuffleDep, firstJobId)
}
}
5.getMissingAncestorShuffleDependencies(line 379)
获得所有rdd的所有宽依赖
private def getMissingAncestorShuffleDependencies(rdd: RDD[_])
: Stack[ShuffleDependency[_, _, _]] = {
val ancestors = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
//getShuffleDependencies获得当前rdd的shuffleDependencies
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
// 若shuffleIdToMapStage不包含ShuffleDep的shuffleId,把这个ShuffleDep存进ancestors
ancestors.push(shuffleDep)
//把ShuffleDep的父RDD再传进waitingForVisit,准备再遍历
waitingForVisit.push(shuffleDep.rdd)
}
}
}
}
ancestors// 包含(除finalRDD的第一个ShuffleDep以外)所有的ShuffleDep(可能为空)
}
6.createShuffleMapStage(line 319)
获得ShuffleMapStage 添加进stageIdToStage
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd//父RDD
val numTasks = rdd.partitions.length// 分区数
//获得父stage,第一次到这里时返回空list,没有父stage
val parents = getOrCreateParentStages(rdd, jobId)
//当没有parents时当前的stage为第一个stage,第一次调用到这里,StageId编号1
val id = nextStageId.getAndIncrement()
val stage =
new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
stageIdToStage(id) = stage// 自增id和stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage // 维护宽依赖和stage的关系
updateJobIdStageIdMaps(jobId, stage)// 更新jobIds和jobIdToStageIds
// 把shuffle信息注册到Driver上的MapOutputTrackerMaster的shuffleStatuses
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
// 把Shuffle信息注册到自己Driver的MapOutputTrackerMaster上,
// 生成的是shuffleId和ShuffleStatus的映射关系
// 在后面提交Job的时候还会根据它来的验证map stage是否已经准备好
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage // 返回生成的ShuffleMapStage
}
公众号:大叔据 。
评论不能及时回复可直接加公众号提问或交流,知无不答,谢谢 。