Spark 中 RDD 算子 ReduceByKey 和 Gro

2019-12-11  本文已影响0人  alexlee666

在对RDDPair(一种特殊的 RDD,即RDD[(key, Row)])进行操作时经常会用到 reduceByKey() 和 groupByKey() 两个算子。下面看看两者的区别和使用方法:

一、reduceByKey(func) 和 groupByKey() 的区别

ReduceByKey 示例 GroupByKey 示例

使用reduceByKey()的时候,本地的数据先进行merge然后再传输到不同节点再进行merge,最终得到最终结果。
而使用groupByKey()的时候,并不进行本地的merge,全部数据传出,得到全部数据后才会进行聚合成一个sequence,groupByKey()传输速度明显慢于reduceByKey()。
虽然groupByKey().map(func)也能实现reduceByKey(func)功能,但是优先使用reduceByKey(func)

区别:

区别项 reduceByKey groupByKey 备注
功能 针对 RDDPair 中具有相同 key 的所有 row 做 reduce 操作 针对 RDDPair 中具有相同 key 的所有 row 分组
能自定义函数 可以自定义reduce函数
输出 一个 key 对应一个row 一个key 对应多个row的sequence
性能 更高 更低 groupByKey.map(func) 可以实现 reduceByKey,但是尽量用 reduceByKey,因为更高效

二、Scala 代码--使用方法

import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object TestSparkShuffle {

  class MyPartitioner(partitionNum: Int) extends Partitioner() {
    override def numPartitions: Int = partitionNum

    override def getPartition(key: Any): Int = {
      if (key.asInstanceOf[Int] % 2 == 0) {
        0
      } else {
        1
      }
    }
  }

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local").appName("test").getOrCreate()
    val sc = spark.sparkContext
    val arr = new ArrayBuffer[String]
    genStrArr(36, arr)
    val rdd = sc.parallelize(arr)
    val rddMap: RDD[(Int, String)] = rdd.mapPartitions(
      partition => {
        partition.map(str => (getKey(str), str))
      }
    )
    rddMap.foreach(x => println(x))
    // 按照 key 进行分组,且key为奇、偶数的row各分在0、1分区内
    val rddMap2 = rddMap.groupByKey(new MyPartitioner(2))
    rddMap2.foreach(x => println(x))
    // 对 rddMap 中的row按照row的key,同样的key的value相继使用"-"拼接起来
    val rddMap3 = rddMap.reduceByKey(reduceFunc) 
    //  val rddMap3 = rddMap.reduceByKey(_ + "-" +  _)    // _ + "-" +  _ 中的"_"表示 key 相同的两个value
    rddMap3.foreach(x => println(x))
    println(rddMap.count())

  }

  // reduce 函数,将两个字符串使用"-"拼接
  def reduceFunc(x: String, y : String): String = {
    x + "-" +  y
  }


  def getKey(str: String): Int = {
    Math.abs(str.hashCode % 6)
  }

   // 生成size为num的字符串数组,每个字符串长度为6,由A~Z随机构成
  def genStrArr(num : Int, arr: ArrayBuffer[String]): Unit = {
    val baseChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    val charLen = 6
    val rand = new Random()
    for (x <- Range(0, num)) {
      var subStr = ""
      for (i <- Range(0, charLen)) {
        val order = rand.nextInt(baseChars.length)
        subStr += baseChars.charAt(order)
      }
      arr.append(subStr)
    }
  }


}


测试结果:

# groupByKey 结果
(4,CompactBuffer(HCAESV, OZNIQU, WIIWNX, MEFMUZ, TVFPRH, EMSZJC))
(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))
(2,CompactBuffer(XTAKJH, HOUFFR, KIJCNU, BDILZU, SJFGRN, IZPCHR, RIPRRA, UUGZER))
(1,CompactBuffer(TMWTYV, PYSAJV))
(3,CompactBuffer(UHQTWN, YSLXXE, PNIMWJ, NAYYWU, EYPRPM, SXGUQO, DDSNIY, EXPSPM))
(5,CompactBuffer(ZOGCRZ, VORGBM, CUZZFS, SLFBWC, PFRFRA))


# reduceByKey 结果
(4,HCAESV-OZNIQU-WIIWNX-MEFMUZ-TVFPRH-EMSZJC)
(0,ZCEXLX-BKSGQD-ICRWVA-PXFBAC-SUBCYR-OMEQVV-TMBPHW)
(1,TMWTYV-PYSAJV)
(3,UHQTWN-YSLXXE-PNIMWJ-NAYYWU-EYPRPM-SXGUQO-DDSNIY-EXPSPM)
(5,ZOGCRZ-VORGBM-CUZZFS-SLFBWC-PFRFRA)
(2,XTAKJH-HOUFFR-KIJCNU-BDILZU-SJFGRN-IZPCHR-RIPRRA-UUGZER)

上一篇下一篇

猜你喜欢

热点阅读