SparkCore(一)(RDD和一些算子)
什么是RDD
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。
-
一组分区(Partition),即数据集的基本组成单位;
-
一个计算每个分区的函数;
-
RDD之间的依赖关系;
-
一个Partitioner,即RDD的分片函数;
-
一个列表,存储存取每个Partition的优先位置(preferred location)。
RDD创建的方法
- 从集合中创建 并行度一般为2
##makerdd或parallise都是根据totalcpucores和2比较最大值
##如果直接覆盖makerdd或parallise的第二个分区个数的参数可以改变数量
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
#查看源码所得
sc.parallelize
#makeRDD实际上是在内部创建了一个parallelize
sc.makeRDD
- 从文件中转换
#从文件转换RDD
sc.textFile
#从文件夹拉取多个文件
sc.wholeTextFiles("data/baseinput/ratings100/")
-
textFile在读取小文件的时候,会参考小文件的个数,文件个数越多,分区个数越多
-
sc.textFile遇到小文件没有办法很好合并小文件的,即便重写第二个参数也没有作用
-
用textFile时,它的partition的数量是与文件夹下的文件数量(实例中用3个xxx.log文件)相关,一个文件就是一个partition(既然3个文件就是:partition=3)。
-
wholeTextFiles的partition数量是根据用户指定或者文件大小来(文件内的数据量少 有hdfs源码默认确定的)
-
确定与hdfs目录下的文件数量无关!所以说:wholeTextFile通常用于读取许多小文件的需求。
查看RDD分区的shell命令
#从集合中创建
sc.parallelize(Seq(1,2,3,4))
#查看分区数量(并行数量)
res3.getNumPartitions
#查看分区并行数量的内容
#将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
res3.glom.collect
#查看分区数量(并行数量)
res3.partitions.length
关于DRR分区决定因素
-
第一点:RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源;
-
第二点:在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数的2~3倍;
-
第三点:RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数有关
partitionBy 改变分区
解析:
- 对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD.
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24
scala> rdd.partitions.size
res24: Int = 4
scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26
scala> var rdd3 = rdd.partitionBy(new org.apache.spark.RangePartitioner(2,rdd))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[40] at partitionBy at <console>:25
scala> rdd2.partitions.size
res25: Int = 2
scala> rdd2.glom.collect
res26: Array[Array[(Int, String)]] = Array(Array((2,bbb), (4,ddd)), Array((1,aaa), (3,ccc)))
scala> rdd3.glom.collect
res27: Array[Array[(Int, String)]] = Array(Array((1,aaa), (2,bbb)), Array((3,ccc), (4,ddd)))
注意:Spark采用的分区有三种:
-
水平分区,也就是sc.makerdd按照下标元素划分,
-
Hash划分根据数据确定性划分到某个分区,一般只给定分区数。
-
Range分区该方法一般按照元素大小进行划分不同区域,每个分区表示一个数据区域,如数组中每个数是[0,100]之间的随机数,Range划分首先将区域划分为10份,然后将数组中每个数字分发到不同的分区,比如将18分到(10,20]的分区,最后对每个分区进行排序。
RDD编程
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。
RDD的转化 ( 重点掌握 )
RDD整体上分为 TRANSFORMATIONS 跟 ACTIONS 两种
Value类型
map(func) 重点
将RDD创建的集合转换为另外一个映射集合,例如,如果将一个Array中的数全部 *2 输出,那么就会用到map方法。例如
//创建一个array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
//集合内每个元素*2
scala> res0.map(_*2)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27
//打印输出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
mapPartitions(func)
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。同样以上述的需求为例:
//创建一个array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
//集合内每个元素*2
scala> res0.mapPartitions(x=>x.map(_*2))
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27
//打印输出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
mapPartitionsWithIndex(func)
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];
glom
将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
groupBy(func) 重点
分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26
scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))
上述例子解释是创建一个1到4的序列,然后把能被2整除的放进一个元祖中,不能被2整除的放入另外一个元祖中。那么分组的条件就是%2
filter(func) 重点
过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。比如创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26
scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
sortBy(func,[ascending], [numTasks]) 重点
使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
//创建一个RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
//按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)
//按照与3余数的大小排序
scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)
Key-Value类型
partitionBy
pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
groupByKey
作用:groupByKey也是对每个key进行操作,但只生成一个sequence。
//创建一个pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)
scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26
//将相同key对应值聚合到一个sequence中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28
//打印结果
scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
//计算相同key对应值的相加结果
scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31
//打印结果
scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
reduceByKey(func, [numTasks])
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
//创建一个pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24
//算相同key对应值的相加结果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26
//打印结果
scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))
reduceByKey和groupByKey的区别
1.reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
2.groupByKey:按照key进行分组,直接进行shuffle。
aggregateByKey
在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
(1)zeroValue:给每一个分区中的每一个key一个初始值;
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;
(3)combOp:函数用于合并每个分区中的结果。
//创建一个pairRDD
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
//取出每个分区相同key对应值的最大值,然后相加
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26
//打印结果
scala> agg.collect()
res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
reduceByKey和groupByKey的区别
- reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
- groupByKey:按照key进行分组,直接进行shuffle。
val createCombiner = (v: V) => CompactBuffer(v) ,它把一个V变成一个C(例 如,创建一个单元素列表)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v ,将一个V合并到一个C中(例如,将它添加到列表的末尾)
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 ,将两个C合并成一个C。
- 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。