4.3 DAGScheduler - Locality特性
2018-11-14 本文已影响0人
GongMeng
1. 概述
和Hadoop1.0时代一样, spark在计算DAG执行时, 也会考虑到Locality特性. 这个在上文中已经描述过了. 大量的Map操作尽可能的在同一个物理机器上运行, 规避到从物理机到物理机的传输过程.
2. 实现
我们可以看到一个非常重要的概念是NarrowDependency
, 它的基本含义是这个RDD中的parition依赖的上一个parition是经过Transform操作而不是Action操作而来的.
结合上文, 我们知道所有Action类操作会导致shuffle, 从而导致一个新的stage的生成.
所以locality可以理解成, 尽可能让一个stage内部的A->B->C->D这些transform工作在同一台物理机上完成. RDD中的Partition被读取, 并初始化在哪台Executor上, 就在那个Executor上各种执行操作, 直到遇到一个shuffle操作为止.
这个特性我们后边在描述RDD时还会再提起, 这个特性同样存在于Flink中, 只不过早期版本的Apache Flink优化的更狠, 它会直接合并所有中间这些操作变成一个大操作. 如果这个操作后继没有依赖, 它就会变成一个独立任务进行执行.
/**
* Recursive implementation for getPreferredLocs.
*
* This method is thread-safe because it only accesses DAGScheduler state through thread-safe
* methods (getCacheLocs()); please be careful when modifying this method, because any new
* DAGScheduler state accessed by it may require additional synchronization.
*/
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}