17-SparkCore04
collect
collect
countByKey
countByValue
collectAsMap
groupByKey vs reduceByKey
val rdd=sc.textFile("file:///home/hadoop/data/ruozeinput.txt").flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_)
rdd.collect
val rdd=sc.textFile("file:///home/hadoop/data/ruozeinput.txt").flatMap(_.split("\t")).map((_,1)).groupByKey().map(x=>(x._1,x._2.sum))
rdd.count
org.apache.spark.rdd.RDD[(String, Iterable[Int])]
(hello,[1,1,1,1])
(world,[1,1])
reduceByKey
map端做了一次预聚合操作 mr:combiner
val peopleInfo = sc.parallelize(Array(("G301","糊涂虫"),("G302","森老"),("G303","Gordon"))).map(x=>(x._1, x))
val peopleDetail = sc.parallelize(Array(("G301","清华大学",18))).map(x=>(x._1,x))
peopleInfo.join(peopleDetail).map(x=>{x._1 + "," + x._2._1._2 + "," + x._2._2._2+ "," + x._2._2._3}).collect
RDD[(String, ((String, String), (String, String, Int)))]
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class Info(name:String, gender:String, address:String)
val names = Array[String]("G304","G305","G306")
val genders = Array[String]("male","female")
val addresses = Array[String]("beijing","shenzhen","wenzhou","hangzhou")
val infos = new ArrayBuffer[Info]()
for (i<-1 to 1000000){
val name = names(Random.nextInt(3))
val gender = genders(Random.nextInt(2))
val address = addresses((Random.nextInt(4)))
infos += Info(name, gender, address)
}
val rdd = sc.parallelize(infos)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
rdd.count()
34.3 MB
190915728
19014993
org.apache.spark.util.SizeEstimator.estimate(rdd)
1000task
5executor 5core == 25task 40轮
10ex 5core 50 20
/ruoze/emp/y=1980
1980.txt
/ruoze/emp/y=1981
1981.txt
/ruoze/emp/y=1987
...
.......