4.3 DAGScheduler - Locality特性

2018-11-14  本文已影响0人  GongMeng

1. 概述

和Hadoop1.0时代一样, spark在计算DAG执行时, 也会考虑到Locality特性. 这个在上文中已经描述过了. 大量的Map操作尽可能的在同一个物理机器上运行, 规避到从物理机到物理机的传输过程.

2. 实现

我们可以看到一个非常重要的概念是NarrowDependency, 它的基本含义是这个RDD中的parition依赖的上一个parition是经过Transform操作而不是Action操作而来的.

结合上文, 我们知道所有Action类操作会导致shuffle, 从而导致一个新的stage的生成.

所以locality可以理解成, 尽可能让一个stage内部的A->B->C->D这些transform工作在同一台物理机上完成. RDD中的Partition被读取, 并初始化在哪台Executor上, 就在那个Executor上各种执行操作, 直到遇到一个shuffle操作为止.

这个特性我们后边在描述RDD时还会再提起, 这个特性同样存在于Flink中, 只不过早期版本的Apache Flink优化的更狠, 它会直接合并所有中间这些操作变成一个大操作. 如果这个操作后继没有依赖, 它就会变成一个独立任务进行执行.

/**
   * Recursive implementation for getPreferredLocs.
   *
   * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
   * methods (getCacheLocs()); please be careful when modifying this method, because any new
   * DAGScheduler state accessed by it may require additional synchronization.
   */
  private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // If the partition has already been visited, no need to re-visit.
    // This avoids exponential path exploration.  SPARK-695
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // If the partition is cached, return the cache locations
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // If the RDD has some placement preferences (as is the case for input RDDs), get those
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
    // that has any placement preferences. Ideally we would choose based on transfer sizes,
    // but this will do for now.
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }
上一篇下一篇

猜你喜欢

热点阅读