关于Spark、Scala实现WordCount的8种写法(多种
2019-07-31 本文已影响0人
每天起床打酱油
1、groupByKey()
RDD[(String, Int)] = sc.makeRDD(List(("a",1), ("b",2), ("a",3), ("b",4)))
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1), ("b",2), ("a",3), ("b",4)))
// 转换算子 —— groupByKey
val rdd2: RDD[(String, Iterable[Int])] = rdd.groupByKey()
val rdd3: RDD[(String, Int)] = rdd2.map {
case (c, datas) => {
(c, datas.sum)
}
}
rdd3.collect().foreach(println)
sc.stop()
2、reduceByKey()
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
// 转换算子 —— reduceByKey
//val value: RDD[(String, Int)] = rdd.reduceByKey((x,y)=>{x+y})
val value: RDD[(String, Int)] = rdd.reduceByKey(_+_)
value.collect().foreach(println)
sc.stop()
3、aggregateByKey
转换算子 —— aggregateByKey()()使用了函数柯里化
存在两个参数列表 :
第一个参数列表表示分区内计算时的初始值(零值)——在初始值的基础上做比较运算
第二参数列表中需要传递两个参数
第一个参数表示分区内计算规则
第二个参数表示分区间计算规则
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",2), ("b",3),
("a",3), ("b",4), ("a", 5)), 2)
val value: RDD[(String, Int)] = rdd.aggregateByKey(0)
((x,y)=>{Math.max(x,y)},
(x,y)=>{x+y})
value.collect().foreach(println)
4、foldByKey()
foldByKey其实就是aggregateByKey简化版,
当aggregateByKey中分区内和分区间的计算规则一样时,使用foldByKey就可以了
rdd.aggregateByKey(0)(+,+) ——> rdd.foldByKey(0)(+)
// TODO 当aggregateByKey中分区内和分区间的计算规则一样时,使用foldByKey就可以了
//rdd.aggregateByKey(10)((x,y)=>{Math.max(x,y)},(x,y)=>{x+y})
// val value2: RDD[(String, Int)] = rdd.aggregateByKey(0)(_+_,_+_)
val value3: RDD[(String, Int)] = rdd.foldByKey(0)(_+_)
value3.collect().foreach(println)
5、combineByKey()
根据key计算每种key的平均值
combineByKey 需要传递三个参数
- 将第一个key出现的v转换结构计算规则
- 第二个参数表示分区内计算规则
- 第三个参数表示分区间计算规则
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(
Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),
2
)
//过程
// ("a", 88), ("b", 95), ("a", 91)
// ("b", 93), ("a", 95), ("b", 98)
// (88,1) + 99 => (187,2)
// 分区内第一次碰见key的时候,将数据V进行结构的转变
// v => (v,1)
// combineByKey 需要传递三个参数
val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey(
// 1. 将第一个key出现的v转换结构计算规则
//("a", 88)=>(88,1)
(num: Int) => (num, 1),
// 2. 第二个参数表示分区内计算规则
// (88,1) + 99 => (187,2}
(t: (Int, Int), num: Int) => {
(t._1 + num, t._2 + 1)
// 3. 第三个参数表示分区间计算规则
//tuple(a1,))
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
val resultRDD: RDD[(String, Int)] = rdd1.map {
case (key, t) => {
(key, t._1 / t._2)
}
}
resultRDD.collect().foreach(println)
6、基础版 groupBy+双层map()
val rdd1= rdd.groupBy{case t=>t._1}
val rdd2= rdd1.map {
case (words, t) => {
(words,t.map {
case (word,num) => num
})
}
}
val result: RDD[(String, Int)] = rdd2.map {
case (word, t) => (word, t.sum)
}
7 、 行动算子 ——countByKey
println(rdd.countByKey())
8、 行动算子 ——countByValue
以tuple为单位 ("a",1)
println(rdd.countByValue())