Spark-RDD介绍
RDD
1 RDD介绍
- Driver program:
包含程序的main()方法,RDDs的定义和操作。
管理节点,我们称为executors。
-
SparkContext:
Driver programs 通过SparkContext对象访问Spark
SparkContext对象代表和一个集群的链接。
在shell中SparkContext是自动创建的,就是sc变量。 -
RDDs
Resilient distributed datasets(弹性分布式数据集)。
并行的分布在整个集群中。
RDDs是spark分发数据和计算的基础抽象类。
一个RDD是一个不可改变的分布式集合对象。
spark中所有的计算都是通过RDD的生成、转换、操作、进行的。
RDD内部由很多partition(分片)组成。 -
分片
每个分片包含一部分数据,partitions可以在集群不同的结点上计算。
分片是Spark并行处理的单元,Sparks顺序的、并行的处理分片。
RDD的创建方法
把存在的集合传递给SparkContext的parallelize()方法,一般测试使用。
val rdd = sc.parallelize(Array(1,2,3,4),4)
//参数1:待处理集合
//参数2:分区个数
需要注意rdd的操作都在worker机上,因此输出rdd的元素将在worker机的标准输出上进行,驱动节点上不会运行,故直接在程序print是没有输出的。需要先对各个worker上的内容进行collect
另一种是加载外部文件
val rddText = sc.textFile("filepath")
Scala基础语法
-
变量声明:
val 或者 var
val变量不可修改,一旦分配就不能重新指向。
var分配后可以重新指向相同类型的值。 -
匿名函数和类型推断:
lines.filter(line =>line.contains("world")
括号内是匿名函数,接受一个参数line
使用line这个String类型上的方法contains,并返回结果
line的类型无需指定,会推断出来。
2 RDD的基本操作Transformation
Transformation意思是转换。
从之前的RDD构建一个新的RDD,如map()和filter()。
逐元素运算
-
map()
接收函数,将函数应用到RDD的每个元素,返回新的RDD。 -
filter()
接收函数,返回只包含满足filter函数的元素的新RDD。 -
flatMap()
对每个输入的元素,输出多个元素。将RDD的元素压扁后输出新的RDD。
\\读入文件
scala> val input = sc.textFile("edwintest/inputfile2.txt")
scala> input.collect().foreach(print)
one two threefour five sixseven eight nightten
\\通过map转为(元素,1)格式
scala> val mapInput = input.map(word=>(word,1))
scala> mapInput.collect().foreach(print)
(one two three,1)(four five six,1)(seven eight night,1)(ten,1)
\\通过flatMap将split后的元素压缩
scala> val inputSplit = input.flatMap(line=>line.split(" "))
scala> inputSplit.collect.foreach(print)
onetwothreefourfivesixseveneightnightten
集合运算
RDDs支持数学的集合计算,如并集交集等。
scala> val rdd1 = sc.parallelize(Array("1","2","3","1","4","3"))
scala> val rdd2 = sc.parallelize(Array("1","2","5","6"))
\\去重
scala> val rdd_distinct = rdd1.distinct()
scala> rdd_distinct.collect().foreach(print)
4231
\\并
scala> val rdd_union = rdd1.union(rdd2)
scala> rdd_union.collect().foreach(println)
1231431256
\\交
scala> val rdd_inter = rdd1.intersection(rdd2)
scala> rdd_inter.collect().foreach(print)
21
\\rdd1独有
scala> val rdd_sub = rdd1.subtract(rdd2)
scala> rdd_sub.collect().foreach(print)
433
3 RDD的基本操作Action
Action在RDD上计算出一个结果,并将结果返回给Driver program,如count(),save
- reduce()
接收一个函数,作用在RDD两个类型相同的元素上,返回新元素。可以实现RDD元素中的累加,计数和其它类型的聚集操作。
scala> val rdd = sc.parallelize(Array(1,2,3,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27
scala> rdd.collect()
res3: Array[Int] = Array(1, 2, 3, 3)
scala> rdd.reduce((x,y)=>x+y)
res4: Int = 9
-
collect()
遍历整个RDD,向Driver program返回RDD的内容。
返回的内容需要单机内容能够容纳。
大数据情况下,使用saveAsTextFile() action等。 -
take(n)
返回RDD的n个元素(尝试访问最少的partitions)
返回结果无序,常用于测试使用
scala> rdd.take(2)
res5: Array[Int] = Array(1, 2)
scala> rdd.take(3)
res6: Array[Int] = Array(1, 2, 3)
- top()
排序(根据RDD中的数据比较器)
也可以自定义比较器。
scala> rdd.top(1)
res7: Array[Int] = Array(3)
scala> rdd.top(3)
res8: Array[Int] = Array(3, 3, 2)
- foreach()
计算RDD中的每一个元素,但是不返回到本地
4 RDD的特性
- RDDs的血统图:
Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图。Spark通过血统关系图计算RDD的需求并对丢失的数据进行恢复。
-
延迟计算
Spark对RDDs的计算并不是在进行函数操作时进行计算的,而是在使用action时才计算。
可以有效的减少数据传输。
Spark内部记录metadata 表明transformation的操作已经响应。
加载数据也会延迟计算,只有在数据需要时才会读入内存。 -
RDD.persist()
RDD的持久化
默认RDDs上面进行action时,Spark都重新计算RDDs
如果想要重复使用一个RDD,则可以使用RDD.presist()
unpresist()方法可以将缓存移除。
参数决定优先度。
6 KeyValue对RDDs
- 创建KeyValue对RDDs
使用map函数,返回key-value对
scala> input.collect().foreach(print)
one two threefour five sixseven eight nightten
scala> val rdd1 = input.map(line=>(line.split(" ")(0),line))
scala> rdd1.collect().foreach(println)
(one,one two three)
(four,four five six)
(seven,seven eight night)
(ten,ten)
- key-value对的一些常见操作
现在keys和values已经变为属性,不需要加()
- reduceByKey()
将相同key的元素相加。
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(3,6)))
scala> val rdd2 = rdd.reduceByKey((x,y)=>x+y)
scala> rdd2.collect().foreach(println)
(1,2)
(3,10)
- groupByKey()
将相同key的元素分为一组
scala> val rdd3 = rdd.groupByKey()
scala> rdd3.collect().foreach(println)
(1,CompactBuffer(2))
(3,CompactBuffer(4, 6))
- combineByKey()
(createCombiner,mergeValue,mergeCombiner,partitioner)
非常重要且常用的函数,是最常用的基于key的聚合函数,返回类型可以和输入类型不一样。很多基于key的聚合函数都用到了它,如groupByKey()。
combineByKey()
遍历partition中的元素,元素的key。如果是新元素,则使用我们在参数中提供的createCombiner()函数,如果是这个partition中已经存在的key,就会使用mergeValue()函数。
最后合集每个partition的时候,使用mergeCombiner()函数
scala> val scores = sc.parallelize(Array(("jack",80.0),("jack",90.0),("jack",85.0),("mike",80.0),("mike",92.0),("mike",90.0)))
scala> scores.collect().foreach(print)
(jack,80.0)(jack,90.0)(jack,85.0)(mike,80.0)(mike,92.0)(mike,90.0)
val result = scores.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
scala> result.collect().foreach(print)
(mike,(3,262.0))(jack,(3,255.0))
//计算平均数
scala> val aver = result.map{case(name,(num,score))=>(name,score/num)}
scala> aver.collect().foreach(print)
(mike,87.33333333333333)(jack,85.0)
createCombiner()中定义的匿名函数,
第一个参数:createCombiner
score=>(1,score)
score为分数,因为ByKey相关的函数传进的参数都是key后面的内容。
第二个参数:mergeValue
(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore)
c1中Int为出现次数,Double为分数累加。c1为key-value形式,是createCombiner的结果。
第三个参数:mergeCombiner
(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2)
将各个partition的mergeValue进行组合。