Spark 常用RDD操作

2022-05-19  本文已影响0人  万州客

可以对比一下flink学习

代码


scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:23

scala> rdd.first
res3: Int = 1

scala> rdd.count
res4: Long = 10

scala> rdd.reduce(_+_)
res5: Int = 55

scala> rdd.reduce(_*_)
res6: Int = 3628800

scala> rdd.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> rdd.take(3)
res8: Array[Int] = Array(1, 2, 3)

scala> rdd.top(3)
res9: Array[Int] = Array(10, 9, 8)

转换操作
scala> val rdd = sc.parallelize(1 to 9, 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:23

scala> val rdd1 = rdd.map(x => x * 2)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:23

scala> rdd.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> rdd1.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

scala> val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:23

scala> val rdd2 = rdd1.map(x => (x.length, x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[9] at map at <console>:23

scala> rdd2.collect
res12: Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))

scala> val rdd3 = rdd2.mapValue("x" + _ + "x")
<console>:23: error: value mapValue is not a member of org.apache.spark.rdd.RDD[(Int, String)]
       val rdd3 = rdd2.mapValue("x" + _ + "x")
                       ^

scala> val rdd3 = rdd2.mapValues("x" + _ + "x")
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at mapValues at <console>:23

scala> rdd3.collect
res13: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23
           ^

scala> def myfunc[T](iter: Iterator[T]): Iterator[(T, T)] = {
     | var res = List[(T, T)]()
     | var pre = iter.next
     | while (iter.hasNext) {
     | val cur = iter.next
     | res .::= (pre, cur)
     | pre = cur
     | }
     | res.iterator
     | }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
                       ^

scala> a.mapPartitions(myfunc).collect
res15: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

scala> val rdd1 = sc.parallelize(1 to 4, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:23

scala> rdd1.flatMap(x => 1 to x).collect
res16: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

scala> val rdd1 = sc.parallelize(List((1, 2), (3, 4)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:23

scala> val rdd2 = rdd1.flatMapValues(x => x.to(5))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[16] at flatMapValues at <console>:23

scala> rdd2.collect
res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))


scala> val rdd = sc.parallelize(List(("A", 2), ("B", 4), ("C", 6), ("A", 3), ("C", 7)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:23

scala> rdd.groupByKey().collect
res18: Array[(String, Iterable[Int])] = Array((A,CompactBuffer(2, 3)), (B,CompactBuffer(4)), (C,CompactBuffer(6, 7)))

scala> rdd.sortByKey().collect
res19: Array[(String, Int)] = Array((A,2), (A,3), (B,4), (C,6), (C,7))

scala> rdd.sortByKey(false).collect
res20: Array[(String, Int)] = Array((C,6), (C,7), (B,4), (A,2), (A,3))


scala> val rdd = sc.parallelize(List(("A", 2), ("A", 1), ("B", 4), ("B", 6), ("C", 7)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:23

scala> rdd.reduceByKey((x, y) => x + y).collect
res21: Array[(String, Int)] = Array((A,3), (B,10), (C,7))

scala> val rdd = sc.makeRDD(1 to 10).filter(_ % 3 == 0)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at filter at <console>:23

scala> rdd.collect
res22: Array[Int] = Array(3, 6, 9)

scala> val rdd1 = sc.makeRDD(1 to 3, 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at makeRDD at <console>:23

scala> val rdd2 = sc.makeRDD(2 to 4, 1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at makeRDD at <console>:23

scala> val unionRDD = rdd1.union(rdd2)
unionRDD: org.apache.spark.rdd.RDD[Int] = UnionRDD[31] at union at <console>:24

scala> unionRDD.collect
res23: Array[Int] = Array(1, 2, 3, 2, 3, 4)

scala> val intersectionRDD = rdd1.intersection(rdd2)
intersectionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at intersection at <console>:24

scala> intersectionRDD.collect
res24: Array[Int] = Array(3, 2)

scala> val substractRDD = rdd1.subtract(rdd2)
substractRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at subtract at <console>:24

scala> substractRDD.collect
res25: Array[Int] = Array(1)

scala> val l = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 1)
l: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:23

scala> val r = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')), 1)
r: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[43] at parallelize at <console>:23

scala> val joinrdd1 = l.join(r).collect
joinrdd1: Array[(Int, (Int, Char))] = Array((1,(1,x)), (1,(2,x)), (2,(1,y)), (2,(1,z)))

scala> val joinrdd2 = l.leftOuterJoin(r).collect
joinrdd2: Array[(Int, (Int, Option[Char]))] = Array((1,(1,Some(x))), (1,(2,Some(x))), (3,(1,None)), (2,(1,Some(y))), (2,(1,Some(z))))

scala> val joinrdd3 = l.rightOuterJoin(r).collect
joinrdd3: Array[(Int, (Option[Int], Char))] = Array((4,(None,w)), (1,(Some(1),x)), (1,(Some(2),x)), (2,(Some(1),y)), (2,(Some(1),z)))

效果

2022-05-16 15_50_16-704690 Spark大数据分析技术与实战.pdf - SumatraPDF.png
上一篇下一篇

猜你喜欢

热点阅读