(二十)coalesce和repartition

2018-09-20  本文已影响0人  白面葫芦娃92
scala> val data = sc.textFile("file:///home/hadoop/data/input.txt")
data: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/input.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> data.partitions.size
res0: Int = 2
scala> val data1 = data.coalesce(1)
data1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[2] at coalesce at <console>:25
scala> data1.partitions.size
res2: Int = 1
scala> val data2 = data.coalesce(4)
data2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[3] at coalesce at <console>:25
scala> data2.partitions.size
res3: Int = 2

1.res0为什么是2?
textFile函数的源码:

/**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   * @param path path to the text file on a supported file system
   * @param minPartitions suggested minimum number of partitions for the resulting RDD
   * @return RDD of lines of the text file
   */
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }
/**
   * Default min number of partitions for Hadoop RDDs when not given by user
   * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
   * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
   */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
  def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }
override def defaultParallelism(): Int = backend.defaultParallelism()
 override def defaultParallelism(): Int =
    scheduler.conf.getInt("spark.default.parallelism", totalCores)

spark.default.parallelism默认值没有设置的情况下,取totalCores,因启动时设置--master local[2],所以totalCores=2,defaultParallelism为2,defaultMinPartitions为2,minPartitions为2,因此res0=2
2.res3为什么是2?
查看coalesce源码:

/**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions. If a larger number
   * of partitions is requested, it will stay at the current number of partitions.
     (这就是res3=2的原因)
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * @note With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner. The optional partition coalescer
   * passed in must be serializable.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }
scala> val data3 = data.coalesce(4,true)
data3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at coalesce at <console>:25
scala> data3.partitions.size
res4: Int = 4

repartition源码:

/**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   *
   * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

repartition调用了coalesce,repartition是少变多,coalesce是多变少
举个例子:
rdd1 -- filter -->rdd2 --save -->....
假设rdd1有2000条数据,200个分区,经过filter后,rdd2只剩50条数据,但仍为200个分区,必定有很多的空文件,这样是没必要的,因此通常可以在filter操作后增加一个coalesce操作,减少分区,避免过多空文件,coalesce为多少个分区要根据业务场景预估一个数,但是要注意,coalesce(n)之后,源头文件rdd1也变成了n个分区,会对性能造成一定影响,所以这个n要综合考虑两个方面来确定

上一篇 下一篇

猜你喜欢

热点阅读