我爱编程程序员大数据,机器学习,人工智能

Spark入门教程(五)创建弹性分布式数据集Rdd以及Trans

2018-02-19  本文已影响0人  胖滚猪学编程

本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!

什么是弹性分布式数据集Rdd?


Rdd的创建


创建Rdd有两种方式:

Rdd的转换 transformation


转换操作就是从现有的Rdd生成一个新的Rdd的操作,比如filter操作,它从现有的Rdd筛选出符合条件的数据,创建一个新的Rdd。
还是那句话,概念不多说,看实际操作最为直观!

单个Rdd操作

scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd.map(x=>(x+2))
res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:27

scala> res14.collect()
res15: Array[Int] = Array(3, 4, 5, 6, 7)

scala> rdd.map(x=>(x,1))
res18: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[14] at map at <console>:27

scala> res18.collect()
res19: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
scala> val rdd = sc.textFile("hdfs://master/user/spark.hello")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/spark.hello MapPartitionsRDD[16] at textFile at <console>:24

scala> rdd.map(line => line.split(" "))
res20: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at map at <console>:27

scala> res20.collect()
res22: Array[Array[String]] = Array(Array(hello, spark, it, is, perfect), Array(i, want, to, learn))

scala> rdd.flatMap(line => line.split(" "))
res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:27

scala> res0.collect()
res1: Array[String] = Array(hello, spark, it, is, perfect, i, want, to, learn)

map输出是Array[Array[String]]、flatMap输出是Array[String],等于将map再次打平。

[root@master hadoop-2.6.0-cdh5.11.1]# hadoop fs -cat /user/wordcount.test
spark i love you
spark i want learn you

wordcount案例:

scala> val rdd = sc.textFile("hdfs://master/user/wordcount.test")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/wordcount.test MapPartitionsRDD[4] at textFile at <console>:24

scala> val wordmap = rdd.flatMap(line=>line.split(" ")).map(x=>(x,1))
wordmap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:26

scala> wordmap.collect()
res2: Array[(String, Int)] = Array((spark,1), (i,1), (love,1), (you,1), (spark,1), (i,1), (want,1), (learn,1), (you,1))

scala> val wordreduce = wordmap.reduceByKey((x,y)=>(x+y))
wordreduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:28

scala> wordreduce.collect()
res3: Array[(String, Int)] = Array((learn,1), (spark,2), (you,2), (love,1), (i,2), (want,1))
scala> val rdd = sc.parallelize(Array(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> rdd.filter( x => (x==2)).collect()
res8: Array[Int] = Array(2)

scala> val rdd = sc.parallelize(List("sp","sa","vv"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd.filter(x=>x.contains('s')).collect()
res10: Array[String] = Array(sp, sa)
scala> val rdd = sc.parallelize(List((1,"sp"),(1,"sa"),(2,"vv")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> rdd.groupByKey().collect()
res12: Array[(Int, Iterable[String])] = Array((1,CompactBuffer(sp, sa)), (2,CompactBuffer(vv)))
scala> val rdd = sc.parallelize(List("sp","sp","vv"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> rdd.distinct().collect()
res13: Array[String] = Array(sp, vv)
scala> val rdd = sc.parallelize(List((1,"sp"),(3,"sa"),(2,"vv")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd.sortByKey(false).collect()
res14: Array[(Int, String)] = Array((3,sa), (2,vv), (1,sp))
scala> val rdd = sc.parallelize(List((1,1),(1,2),(2,6),(2,5)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> rdd.foldByKey(3)(_+_).collect()
res15: Array[(Int, Int)] = Array((1,6), (2,14))

多个Rdd操作

scala> val list1 = sc.parallelize(List("spark","spark","hello"))
list1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[25] at parallelize at <console>:24

scala> val list2 = sc.parallelize(List("spark","love","you"))
list2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:24

//并集不去重
scala> list1.union(list2).collect()
res16: Array[String] = Array(spark, spark, hello, spark, love, you)

//交集去重
scala> list1.intersection(list2).collect()
res17: Array[String] = Array(spark)

//差集(只存在于第一个dataset、不存在于第二个)不去重
scala> list1.subtract(list2).collect()
res18: Array[String] = Array(hello)
scala> val list1 = sc.parallelize(List((1,"spark"),(2,"spark"),(3,"hello")))
list1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> val list2 = sc.parallelize(List((1,"spark"),(3,"you"),(4,"good")))
list2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[39] at parallelize at <console>:24
//内连接
scala> list1.join(list2).collect()
res20: Array[(Int, (String, String))] = Array((1,(spark,spark)), (3,(hello,you)))
    
// 左外连接,左边rdd全部显示,右边没有的补null
scala> list1.leftOuterJoin(list2).collect()
res21: Array[(Int, (String, Option[String]))] = Array((1,(spark,Some(spark))), (3,(hello,Some(you))), (2,(spark,None)))

// 右外连接,右边rdd全部显示,左边没有的补null    
scala> list1.rightOuterJoin(list2).collect()
res22: Array[(Int, (Option[String], String))] = Array((4,(None,good)), (1,(Some(spark),spark)), (3,(Some(hello),you)))

最后再强调一句,函数太多了,此文只列举了常见的,最好的办法就是直接看官方API !!
Spark官方文档
SparkAPI

上一篇 下一篇

猜你喜欢

热点阅读