关于Spark、Scala实现WordCount的8种写法(多种

2019-07-31  本文已影响0人  每天起床打酱油

1、groupByKey()

RDD[(String, Int)] = sc.makeRDD(List(("a",1), ("b",2), ("a",3), ("b",4)))

val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1), ("b",2), ("a",3), ("b",4)))

// 转换算子 —— groupByKey
val rdd2: RDD[(String, Iterable[Int])] = rdd.groupByKey()

val rdd3: RDD[(String, Int)] = rdd2.map {
  case (c, datas) => {
    (c, datas.sum)
  }
}
rdd3.collect().foreach(println)
sc.stop()

2、reduceByKey()

val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)))

// 转换算子 —— reduceByKey
//val value: RDD[(String, Int)] = rdd.reduceByKey((x,y)=>{x+y})
val value: RDD[(String, Int)] = rdd.reduceByKey(_+_)

value.collect().foreach(println)
sc.stop()

3、aggregateByKey

转换算子 —— aggregateByKey()()使用了函数柯里化
存在两个参数列表 :
第一个参数列表表示分区内计算时的初始值(零值)——在初始值的基础上做比较运算
第二参数列表中需要传递两个参数
第一个参数表示分区内计算规则
第二个参数表示分区间计算规则

  val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",2), ("b",3),
      ("a",3), ("b",4), ("a", 5)), 2)
    val value: RDD[(String, Int)] = rdd.aggregateByKey(0)
    ((x,y)=>{Math.max(x,y)},
    (x,y)=>{x+y})
    value.collect().foreach(println)

4、foldByKey()

foldByKey其实就是aggregateByKey简化版,
当aggregateByKey中分区内和分区间的计算规则一样时,使用foldByKey就可以了
rdd.aggregateByKey(0)(+,+) ——> rdd.foldByKey(0)(+)

// TODO 当aggregateByKey中分区内和分区间的计算规则一样时,使用foldByKey就可以了

//rdd.aggregateByKey(10)((x,y)=>{Math.max(x,y)},(x,y)=>{x+y})
//  val value2: RDD[(String, Int)] = rdd.aggregateByKey(0)(_+_,_+_)
val value3: RDD[(String, Int)] = rdd.foldByKey(0)(_+_)
value3.collect().foreach(println)

5、combineByKey()

根据key计算每种key的平均值
combineByKey 需要传递三个参数

  1. 将第一个key出现的v转换结构计算规则
  2. 第二个参数表示分区内计算规则
  3. 第三个参数表示分区间计算规则
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(
  Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),
  2
)
//过程
// ("a", 88), ("b", 95), ("a", 91)
// ("b", 93), ("a", 95), ("b", 98)
// (88,1) + 99 => (187,2)

// 分区内第一次碰见key的时候,将数据V进行结构的转变
// v => (v,1)
// combineByKey 需要传递三个参数
val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey(
   //  1. 将第一个key出现的v转换结构计算规则
  //("a", 88)=>(88,1)
  (num: Int) => (num, 1),
  //  2. 第二个参数表示分区内计算规则
 // (88,1) + 99 => (187,2}
  (t: (Int, Int), num: Int) => {
    (t._1 + num, t._2 + 1) 
  //  3. 第三个参数表示分区间计算规则  
  //tuple(a1,))
  (t1: (Int, Int), t2: (Int, Int)) => {
    (t1._1 + t2._1, t1._2 + t2._2)
  }
)
val resultRDD: RDD[(String, Int)] = rdd1.map {
  case (key, t) => {
    (key, t._1 / t._2)
  }
}
resultRDD.collect().foreach(println)

6、基础版 groupBy+双层map()

val rdd1= rdd.groupBy{case t=>t._1}
val rdd2= rdd1.map {
  case (words, t) => {
    (words,t.map {
      case (word,num) => num
    })
  }
}
val result: RDD[(String, Int)] = rdd2.map {
  case (word, t) => (word, t.sum)
}

7 、 行动算子 ——countByKey

println(rdd.countByKey())

8、 行动算子 ——countByValue

以tuple为单位 ("a",1)

println(rdd.countByValue())
上一篇下一篇

猜你喜欢

热点阅读