Spark groupByKey和reduceByKey
2019-05-13 本文已影响0人
喵星人ZC
一、从shuffle方面看两者性能
groupByKey和reduceByKey都是ByKey系列算子,都会产生shuffle。我们通过简单的WC看看两者的区别
- groupByKey实现WC
scala> val rdd = sc.parallelize(List(1,1,2,2,3,3)).map((_,1))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:24
scala> rdd.groupByKey().map(x => (x._1,x._2.sum)).collect.foreach(println)
(2,2)
(1,2)
(3,2)
查看WebUI
groupByKey.png
Shuffle Read/Shuffle Write 等于192B
- reduceByKey实现WC
scala> val rdd = sc.parallelize(List(1,1,2,2,3,3)).map((_,1))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:24
scala> rdd.reduceByKey(_+_).collect.foreach(println)
(2,2)
(1,2)
(3,2)
查看WebUI
reduceByKey.png
Shuffle Read/Shuffle Write 等于184B
以此来看reduceByKey的性能比groupByKey好,因为发生shuffle的数据小一些,减少了数据拉去次数和网络IO、磁盘IO。
二、通过源码追踪为何reduceByKey更加适合在生产中使用
groupByKey源码
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
groupByKey调用的是groupByKey,我们继续点进去
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
reduceByKey源码
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
reduceByKey调用的是reduceByKey,我们继续点进去,reduceByKey调用的是combineByKeyWithClassTag
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
继续点进去
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
通过传入的参数我们可以发现两者最大的不同是mapSideCombine参数的不同。mapSideCombine参数是否进行map端的本地聚合,groupByKey的mapSideCombine默认值为false,表示不进行map的本地聚合,reduceByKey的mapSideCombine默认值为true,表示进行map的本地聚合。
我们通过MapReduce的shuffle过程可以知道shuffle发生在reduce task 拉去 map task处理的结果数据的过程间,所以在map端进行一次数据的本地聚合能够优化shuffle。具体请看以下图解过程
groupByKey.png reduceByKey.png