Spark groupByKey和reduceByKey

2019-05-13  本文已影响0人  喵星人ZC

一、从shuffle方面看两者性能
groupByKey和reduceByKey都是ByKey系列算子,都会产生shuffle。我们通过简单的WC看看两者的区别

scala> val rdd = sc.parallelize(List(1,1,2,2,3,3)).map((_,1))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:24

scala> rdd.groupByKey().map(x => (x._1,x._2.sum)).collect.foreach(println)
(2,2)                                                                           
(1,2)
(3,2)

查看WebUI


groupByKey.png

Shuffle Read/Shuffle Write 等于192B

scala> val rdd = sc.parallelize(List(1,1,2,2,3,3)).map((_,1))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:24

scala> rdd.reduceByKey(_+_).collect.foreach(println)
(2,2)
(1,2)
(3,2)

查看WebUI


reduceByKey.png

Shuffle Read/Shuffle Write 等于184B

以此来看reduceByKey的性能比groupByKey好,因为发生shuffle的数据小一些,减少了数据拉去次数和网络IO、磁盘IO。

二、通过源码追踪为何reduceByKey更加适合在生产中使用

groupByKey源码

  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }

groupByKey调用的是groupByKey,我们继续点进去

  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

reduceByKey源码

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

reduceByKey调用的是reduceByKey,我们继续点进去,reduceByKey调用的是combineByKeyWithClassTag

 def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

继续点进去

@Experimental
  def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

通过传入的参数我们可以发现两者最大的不同是mapSideCombine参数的不同。mapSideCombine参数是否进行map端的本地聚合,groupByKey的mapSideCombine默认值为false,表示不进行map的本地聚合,reduceByKey的mapSideCombine默认值为true,表示进行map的本地聚合。

我们通过MapReduce的shuffle过程可以知道shuffle发生在reduce task 拉去 map task处理的结果数据的过程间,所以在map端进行一次数据的本地聚合能够优化shuffle。具体请看以下图解过程

groupByKey.png reduceByKey.png
上一篇 下一篇

猜你喜欢

热点阅读