Spark从入门到放弃—RDD
简介
Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
- RDD:弹性分布式数据集
- 累加器: 分布式共享只写变量
- 广播变量: 分布式共享只读变量
本文主要介绍Spark框架中最为重要的基础数据处理模型RDD,包括它的基本概念、属性、创建、transformation算子、action算子等。
RDD是什么
RDD全称为Resilient Distributed Dataset,中文译为弹性分布式数据集,是Spark中最基础,同时也是最为重要的数据处理模型。它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD也即分布式对象集合,是一个只读的分区记录集合,每个RDD可以被划分为多个分区,每个分区就是数据集的一部分,同时不同分区可以存储在集群中不同的节点上,从而利用集群节点优势进行并行计算。
RDD的属性
RDD具有数据流模型的特点,包括自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。RDD主要包含以下5个属性:
-
分区列表
RDD数据结构中存在分区列表,用于在执行任务时并行计算,是实现分布式计算的重要属性。对于RDD来说,每个分片会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD的时候指定RDD的分片个数,如果未指定,那么就会采用默认值,默认值就是程序所分配到的CPU Core的个数。 对应方法或属性
-
分区计算函数
Spark在计算时,是使用分区函数对每一个分区进行单独计算的。 对应方法或属性
-
RDD之间的依赖关系
RDD是计算模型的封装,一个RDD可能由多个其他RDD构建而来,当需求中需要将多个计算模型进行组合时,就需要对多个RDD建立依赖关系,即保存当前RDD所依赖的父RDD列表。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。 对应方法或属性
-
分区器
当数据为K-V类型数据时,可以通过设定分区器自定义数据的分区。 对应方法或属性
-
首选位置
对数据进行计算时,可以根据计算节点的状态选择不同的节点位置进行计算。 对应方法或属性
RDD的执行原理和过程
从计算的角度来讲,数据处理过程中需要计算资源和计算模型。执行时,需要将计算资源和计算模型进行协调和整合。Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上,按照执行的计算模型进行数据计算,最终得到结果。
下面以一个例子来说明RDD的完整执行流程,其中T代表“Transformation”,A代表“Action”,如下图。
RDD执行过程的一个实例
在上图中,从输入中逻辑上生成A和C两个RDD,经过一系列“转换”操作,逻辑上生成了F(也是一个RDD),之所以说是逻辑上,是因为这时候计算并没有发生,Spark只是记录了RDD之间的生成和依赖关系。当F要进行输出时,也就是当F进行“Action”操作的时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算。
上述这一系列处理称为一个“血缘关系(Lineage)”,即DAG拓扑排序的结果。采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化(pipeline),避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间数据,因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理。同时,这种通过血缘关系把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑。
RDD的创建
Spark中创建RDD的方式主要有4种:
1. 从集合(内存)中创建RDD
从集合中创建RDD,Spark主要提供了两个方法,parallelize和makeRDD,代码如下:
val rdd1 = sc.parallelize(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(6,7,8,9))
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
输出结果:
注意,上述代码是在Spark中自带的shell工具中执行的,可以先在Spark官网下载spark安装包,然后执行/spark-3.0.1-bin-hadoop2.7/bin/spark-shell 启动终端工具。
2. 从外部存储(文件)中创建RDD
其中外部存储系统的数据集创建RDD包括:本地的文件系统、所有Hadoop支持的数据集,比如HDFS、HBase等。
val rdd = sc.textFile("/Volumes/Study/Scala/Codes/ScalaTest/datas/1.txt")
rdd.collect().foreach(println)
结果如下:
3. 从其他RDD创建
主要是通过一个RDD运算完后,再产生新的RDD,以上述产生的RDD为例,对齐施加一个map操作,在每一行的末尾加上一个“!”。代码如下:
val rdd1 = rdd.map(item => {item + "!"})
rdd1.collect().foreach(println)
结果如下:
4. 直接创建 RDD(new)
使用new的方式直接构造RDD,一般由Spark框架本身使用,这里就不举例子了。
RDD的操作(算子)
RDD的操作算子包括两类,一类叫做Transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做Actions,它是用来触发RDD的计算,得到RDD相关结算结果或者将RDD保存到文件系统中去。下面分别来介绍:
Transformations(转换算子)
map算子
对处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也是可以数值的转化,然后返回一个新的RDD。
下面的例子中的map算子的作用是把RDD中的所有元素都乘以2。
val rdd = sc.makeRDD(List(1,2,3,4))
val mapRdd = rdd.map((num:Int)=>{num*2})
mapRdd.collect().foreach(println)
结果:
mapPartitions算子
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
下面的例子是要筛选出RDD中数值为2的元素
val rdd = sc.makeRDD(List(1,2,3,4))
val mapRdd = rdd.mapPartitions(datas => {
datas.filter(_==2) }
)
mapRdd.collect().foreach(println)
结果:
map与mapPartitions从以下3个方面的区别:
- 数据处理角度
Map算子是分区内一个数据一个数据地执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。 - 功能的角度
Map算子主要目的是将数据源中的数据进行转换和改变,但是不会减少或者增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求元素保持不变,所以可以增加或者减少数据。 - 性能的角度
Map算子类似串行操作,所以性能比较低,而MapPartitions算子类似于批处理操作,所以性能较高。但是MapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,一般使用Map算子。
mapPartitionsWithIndex算子
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
下面的例子在创建RDD的时候,显式指定只包含2个分区,然后返回指定分区的迭代器。代码如下:
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
if ( index == 1 ) {
iter
} else {
Nil.iterator
}
}
)
mpiRDD.collect().foreach(println)
结果:
flatMap算子
将处理的数据进行扁平化后再进行映射处理,所以该算子也称之为扁平映射。例子如下:
val rdd: RDD[List[Int]] = sc.makeRDD(List(
List(1, 2), List(3, 4)
))
val flatRDD: RDD[Int] = rdd.flatMap(
list => {
list
}
)
rdd.collect().foreach(println)
flatRDD.collect().foreach(println)
将原先两个列表铺平成一个列表,结果如下:
groupBy算子
将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中。
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组。
下面的例子是根据元素的首字母对RDD进行分组
val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop", "Ice"), 2)
val groupRDD = rdd.groupBy(_.charAt(0))
groupRDD.collect().foreach(println)
结果:
可以看到原先的元素被按照首字母区别被分为了三组。
filter算子
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产条件下,可能会出现数据倾斜。
下面的例子是对RDD中的数据根据奇偶性来进行过滤,代码如下:
val rdd = sc.makeRDD(List(1,2,3,4))
val filterRDD: RDD[Int] = rdd.filter(num => num%2!=0)
filterRDD.collect().foreach(println)
结果如下:
sample算子
根据指定的规则从数据集中抽取数据,此算子包含3个参数,含义分别如下:
- 第一个参数
抽取的数据是否放回,True:放回; False: 不放回 - 第二个参数
每个元素被抽取的几率,范围在[0,1]之间,0:全不取,1:全取。 - 第三个参数
随机数种子
下面的例子分别展示了不放回的抽取和有放回的抽取,代码如下:
val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
println(rdd.sample(
false,
0.5,
).collect().mkString(","))
println(rdd.sample(
true,
0.4,
).collect().mkString(","))
两种抽取方式的结果如下:
distinct算子
对数据进行去重。
val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))
val rdd1 = rdd.distinct()
rdd1.collect().foreach(println)
结果:
coalesce算子
根据参数来缩减分区,用于大数据集过滤后,提高小数据集的执行效率。当Spark程序中,存在过多的小人物的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本。但是该方法默认情况下不会将分区的数据打乱重新组合,这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜。如果想让数据均衡,可以进行shuffle处理。
val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3) //默认使用3个分区
val newRDD = rdd.coalesce(2,true) //将shuffle操作打开
newRDD.saveAsTextFile("./") // 分区的情况可以通过保存的文件数量来判断
结果:
可以看到目前变成了2个分区。
repartition算子
该操作内部其实执行的是coalesce操作,参数shuffle的默认值是true。无论是将分区多的RDD转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。
val dataRDD = sc.makeRDD(List( 1,2,3,4,1,2), 2)
val dataRDD1 = dataRDD.repartition(4)
dataRDD1.saveAsTextFile("./") // 分区的情况可以通过保存的文件数量来判断
结果:
可以看到被分为了4个分区
sortBy算子
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD分区数与原RDD分区数一致,这个过程中存在shuffle过程。
val dataRDD = sc.makeRDD(List(1,2,3,4,1,9), 2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
dataRDD1.collect().foreach(println)
排序结果为:
下面展示经过f函数处理之后,再进行排序操作:
val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
val newRDD = rdd.sortBy(t=>t._1.toInt, false)
newRDD.collect().foreach(println)
可以看到此时的sortBy函数是先对每个元素进行处理之后,再进行排序的。
intersection算子、union算子、subtract算子、zip算子
- intersection算子
对源RDD和参数RDD求交集后返回一个新的RDD。 - union算子
对源RDD和参数RDD求并集后返回一个新的RDD。 - subtract算子
以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来,求差集。 - zip算子
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
下面以一个综合的例子来说明:
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))
// 交集 : 【3,4】
val rdd3 = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))
// 并集 : 【1,2,3,4,3,4,5,6】
val rdd4 = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集 : 【1,2】
val rdd5 = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// 拉链 : 【1-3,2-4,3-5,4-6】
val rdd6 = rdd1.zip(rdd2)
val rdd8 = rdd1.zip(rdd7)
println(rdd6.collect().mkString(","))
reduceByKey算子、groupByKey算子
reduceByKey可以将数据按照相同的 Key 对 Value 进行聚合。
下面的例子是对Key相同的元素,将Value进行相加,代码如下:
val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",5)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
dataRDD2.collect().foreach(println)
结果为:
groupByKey将数据源的数据根据 key 对 value 进行分组。
val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",5)))
val dataRDD2 = dataRDD1.groupByKey()
dataRDD2.collect().foreach(println)
结果:
reduceByKey算子和groupByKey算子的区别:
- 从shuffle的角度
reduceByKey和groupByKey都存在shuffle操作,但是reduceByKey可以在shuffle前对分区内相同的key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。 - 从功能的角度
reduceByKey其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey。
Spark中涉及到shuffle操作,必须在磁盘中进行,不能在内存中进行,因为会等待,内存可能会溢出。GroupByKey需要罗盘,但是没有预聚合,故要罗盘的数据需要多一些,而reduceByKey可以先预聚合,故需要存到磁盘的数据相对较小一点,故效率较高。
aggregateByKey算子
之前提到的reduceByKey算子和GroupByKey算子在分区内以及分区间都只能执行相同的规则。如果在分区内和分区间需要应用不同的规则,应该怎么办呢?aggregateByKey算子可以将数据根据不同的规则进行分区内计算和分区间计算。aggregateByKey算子存在函数柯里化,即它有两个参数列表,第一个参数列表需要传递一个参数,表示为初始值,目的是当碰见第一个Key的时候,和对应的Value进行分区内计算。第二个参数列表主要传递两个参数,第一个参数表示分区内计算规则;第二个参数表示分区间计算规则。
下面的例子的功能是,先取同一个分区内的最大值,然后将不同分区取出的最大值进行相加。代码如下:
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
rdd.aggregateByKey(0)((x,y) => math.max(x,y), (x,y) => x + y).collect().foreach(println)
结果如下:
另外一个例子功能是,计算所有相同key的数据的平均值。即,我们需要统计所有key出现的次数,并且将相同key的value累加起来,再除以出现的次数,代码如下:
val rdd = sc.makeRDD(List(("a",1), ("a",2), ("b",3),
("b",4), ("b", 5), ("a", 6)), 2)
val newRDD = rdd.aggregateByKey( (0,0) )(//初始值tuple的第一个元素代表累加和,第二个代表出现的次数
(t, v) => {(t._1 + v, t._2 + 1)}, //分区内的计算,第一个代表初始值t,第二个代表元素中的value
(t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2)}
)
val rdd = sc.makeRDD(List(("a",1), ("a",2), ("b",3), ("b",4), ("b", 5), ("a", 6)), 2)
val resRDD = newRDD.mapValues {
case (num, cnt) => {num/cnt}
}
resRDD.collect().foreach(println)
结果:
foldByKey算子
当分区内计算规则和分区间计算规则相同时,aggregateByKey算子就可以简化为foldByKey算子。
val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("a",3),("b",4)), 4)
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
dataRDD2.collect().foreach(println)
结果:
combineByKey算子
最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。combineByKey方法需要三个参数,第一个参数表示将相同key的第一个数据进行结构的转换,实现相应操作;第二个参数表示分区内的计算规则;第三个参数表示分区间的计算规则。
还是以上述的计算相同key的平均值的例子来说明,看一下使用combineByKey如何来实现相同的功能,代码如下:
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),2)
val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
( t:(Int, Int), v ) => {
(t._1 + v, t._2 + 1)
},
(t1:(Int, Int), t2:(Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
case (num, cnt) => {
num / cnt
}
}
resultRDD.collect().foreach(println)
结果是一致的,如下:
下面比较一下reduceByKey、foldByKey、aggregateByKey、combineByKey的区别:
- reduceByKey
相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同。- FoldByKey
相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同。- AggregateByKey
相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规 则可以不相同,分别用3个参数来指定不同的规则。- CombineByKey
当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
sortByKey算子
在一个(K,V)结构的RDD上调用,K必须实现Ordered接口特质,返回一个按照key进行排序的RDD,只有一个参数用来指定是升序还是降序。
val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1 = dataRDD1.sortByKey(true)
val sortRDD2 = dataRDD1.sortByKey(false)
sortRDD1.collect().foreach(println)
sortRDD2.collect().foreach(println)
结果:
join算子
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一个的(K,(V,W))的RDD。即,两个不同数据源的数据,相同的key的value会连接在一起,形成元祖。一般有以下两种情况:
如果两个数据源中的key没有匹配上,那么数据不会出现在结果中。如果两个数据源中key有多个相同的,会依次匹配,因此可能会出现笛卡尔乘积,数据会呈几何性增长,导致性能降低。
代码如下:
val rdd = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
结果为:
如果出现了多个key相同,那么会分别匹配,一个例子如下:
val rdd = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"),(3,"s")))
val rdd1 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
结果:
可以看到key为3的元素,在join之后,出现了多个结果。
leftOuterJoin算子、rightOuterJoin算子
类似于SQL语句的左外连接,右外连接。左外和右外的区别就是以谁为主RDD而已。
代码:
val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("d",3)))
val dataRDD2 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd = dataRDD1.leftOuterJoin(dataRDD2)
rdd.collect().foreach(println)
结果:
cogroup算子
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD。相当于是两个RDD先在本身内根据key进行关联,将相同key的value放在一个Iterable,然后第一个RDD 会去第二个RDD中寻找key相同的RDD,进行关联,返回新的RDD,新的RDD中元素的value是迭代器。
代码如下:
val dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("d",3)))
val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3)))
val value = dataRDD1.cogroup(dataRDD2)
value.collect().foreach(println)
结果:
可以看到,对于其他RDD中不存在对应的key,cogroup会产生一个空的迭代器来填充。
Actions(行动算子)
reduce算子
聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。代码如下:
val rdd = sc.makeRDD(List(1,2,3,4))
// 聚合数据
val reduceResult = rdd.reduce(_+_)
println(reduceResult)
上述代码对RDD所有元素进行累加,结果如下:
collect算子
在driver 程序中,以数组的形式返回数据集中的所有元素。代码如下:
val rdd = sc.makeRDD(List(1,2,3,4))
//从Executor端收集数据到Driver端
rdd.collect().foreach(println)
结果为:
count算子
返回RDD中元素的个数。代码:
val rdd = sc.makeRDD(List(1,2,3,4))
val countRDD: Long = rdd.count()
println(countRDD)
结果为:
take算子、takeOrdered算子
take算子返回一个由RDD的前n个元素组成的数组,takeOrdered返回该RDD排序后的前n个元素组成的数据。
代码如下:
val rdd = sc.makeRDD(List(1,3,2,4))
val takeRes = rdd.take(2)
println(takeRes.mkString(","))
val takeOrderedRes = rdd.takeOrdered(2)
println(takeOrderedRes.mkString(","))
结果:
aggregate算子
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。注意,这里的初始值会被多次用到。代码如下:
val rdd = sc.makeRDD(List(1, 2, 3, 4), 8)
// 将该RDD内所有元素相加得到结果
val result: Int = rdd.aggregate(10)(_ + _, _ + _)
println(result)
结果为:
这个100是怎么来的呢?注意这里分了8个分区,其中有4个分区是有数据的,每个分区的值加上初始值10,得到11,12,13,14,10,10,10,10,然后所有分区加起来的时候,还需要加上一个初始值10,故最终结果为10+10+10+10+10+11+12+13+14=100。
countByKey算子
统计每种key的个数,代码如下:
val rdd = sc.makeRDD(List((1,"a"), (1,"b"), (1,"c"), (2,"z"),(3,"c"),(3,"c")))
val res = rdd.countByKey()
println(res)
结果:
RDD 依赖关系
由于RDD是粗粒度的操作数据集,每个Transformation操作都会生成一个新的RDD,所以RDD之间会形成类似流水线的前后依赖关系;RDD和它所依赖的父RDDs的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。下图显示了RDD之间的依赖关系:
从上图可知:
- 窄依赖
是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;(独生子女) - 宽依赖
是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;(超生)
RDD缓存
如果在应用程序中多次使用了同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方使用该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。 RDD缓存RDD通过cache或者Persist方法将之前的结果进行缓存,默认情况下会把数缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算 子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
RDD实例--WordCount
下面介绍一个简单的Spark应用程序实例WordCount,它的目的是统计一个数据集中每个单词出现的次数。首先将从hdfs中加载数据得到原始RDD-0,其中每条记录为数据中的一行句子,经过一个flatMap操作,将一行句子切分为多个独立的词,得到RDD-1,再通过map操作将每个词映射为key-value形式,其中key为词本身,value为初始计数值1,得到RDD-2,将RDD-2中的所有记录归并,统计每个词的计数,得到RDD-3,最后将其保存到hdfs。
具体代码如下:
package rdd.builder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val SparkConnf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(SparkConnf)
//根据文件创建原始RDD
val rdd = sc.textFile("./datas/test.txt")
val rdd1 = rdd.flatMap(line => line.split(" ")) //将每一行根据空格分成单个word
val rdd2 = rdd1.map(word => (word, 1)) //将每个word,转换成(word,1)这样的元祖形式
val rdd3 = rdd2.reduceByKey((x, y) => {x+y}) //使用reduceByKey算子,对相同的key进行累加聚合
rdd3.collect().foreach(println)
}
}
整个过程的执行示意图如下:
执行结果:
WordCount结果
参考
- http://sharkdtu.com/posts/spark-rdd.html
- https://www.cnblogs.com/qingyunzong/p/8899715.html
- https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/programming-guide/rdd-guide.html#id3
- http://dblab.xmu.edu.cn/blog/985-2/
- https://yxnchen.github.io/technique/Spark%E7%AC%94%E8%AE%B0-%E7%8E%A9%E8%BD%ACRDD%E6%93%8D%E4%BD%9C/#RDD%E7%9A%84%E6%8C%81%E4%B9%85%E5%8C%96
- https://spark.apache.org/docs/latest/index.html
- https://www.bilibili.com/video/BV11A411L7CK?p=73