Spark笔记004-reduceByKey和groupByke

2018-06-27  本文已影响0人  叫我小明谢谢

突然听同事提起,就随便写一下的。

reduceByKey==groupByKey().map()

做一个word count小例子,

val counts = pairs.groupByKey(count=>(count._1,count._2.sum))

groupByKey的过程 MapPartitionsRDD=>ShuffledRDD=>MapPartitionsRDD=>MapPartitionsRDD
也就是说,它是原封不动的,把ShuffleMapTask的输出,来去到ResultTask的内存中,所以导致所有数据都进行了网络传输
而如果是reduceByKey,看下shuffleMapTask的write的实现,判断了是否有mapSideCombine,如果有,就先本地聚合,再写磁盘,再传输。

 override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
   ......
  }

这到底是在干什么

谈之前要有个共识,分布式系统,网络传输是占时间比重高也非常影响效率的部分。
说些比较飘浮的内容,这其实是mapreduce比较经典的map端combine,也就是说因为是分布式系统啊,首先把数据分散到各个节点并行计算,算完了再把数据传到其他节点去做最终结果计算。那么在第一次计算之前,如果能先做一些对最终结果计算有帮助的计算,再去传输,就能节省一点网络传输时间。
说些更飘浮的内容啊,mr这种计算是为了算结果,也就是把数据的抽象程度变高了,那么,能越早的接近最终结果,越能节约时间。

适用场景

如果有hadoop基础就知道,map端combine和reduce端combine逻辑一致才能得到最终结果。
如果不是,那就是如果需要对单key的所有value放在一起才能计算的逻辑不合适做这种优化。

上一篇 下一篇

猜你喜欢

热点阅读