spark

Spark从入门到放弃—RDD

2021-02-25  本文已影响0人  HaloZhang

简介

Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

本文主要介绍Spark框架中最为重要的基础数据处理模型RDD,包括它的基本概念、属性、创建、transformation算子、action算子等。

RDD是什么

RDD全称为Resilient Distributed Dataset,中文译为弹性分布式数据集,是Spark中最基础,同时也是最为重要的数据处理模型。它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD也即分布式对象集合,是一个只读的分区记录集合,每个RDD可以被划分为多个分区,每个分区就是数据集的一部分,同时不同分区可以存储在集群中不同的节点上,从而利用集群节点优势进行并行计算。

RDD的属性

RDD具有数据流模型的特点,包括自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。RDD主要包含以下5个属性:

RDD的执行原理和过程

从计算的角度来讲,数据处理过程中需要计算资源和计算模型。执行时,需要将计算资源和计算模型进行协调和整合。Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上,按照执行的计算模型进行数据计算,最终得到结果。

RDD是Spark框架中用于数据处理的核心模型,它采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的“Action”操作,对于“Action”操作之前的所有“Transformation”操作,Spark只是记录下“Transformation”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而并不会触发真正的计算。RDD的执行过程如下: RDD执行过程
下面以一个例子来说明RDD的完整执行流程,其中T代表“Transformation”,A代表“Action”,如下图。
RDD执行过程的一个实例

在上图中,从输入中逻辑上生成A和C两个RDD,经过一系列“转换”操作,逻辑上生成了F(也是一个RDD),之所以说是逻辑上,是因为这时候计算并没有发生,Spark只是记录了RDD之间的生成和依赖关系。当F要进行输出时,也就是当F进行“Action”操作的时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算。

上述这一系列处理称为一个“血缘关系(Lineage)”,即DAG拓扑排序的结果。采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化(pipeline),避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间数据,因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理。同时,这种通过血缘关系把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑。

RDD的创建

Spark中创建RDD的方式主要有4种:

1. 从集合(内存)中创建RDD

从集合中创建RDD,Spark主要提供了两个方法,parallelize和makeRDD,代码如下:

val rdd1 = sc.parallelize(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(6,7,8,9))
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
输出结果:

注意,上述代码是在Spark中自带的shell工具中执行的,可以先在Spark官网下载spark安装包,然后执行/spark-3.0.1-bin-hadoop2.7/bin/spark-shell 启动终端工具。

2. 从外部存储(文件)中创建RDD

其中外部存储系统的数据集创建RDD包括:本地的文件系统、所有Hadoop支持的数据集,比如HDFS、HBase等。

val rdd = sc.textFile("/Volumes/Study/Scala/Codes/ScalaTest/datas/1.txt")
rdd.collect().foreach(println)
结果如下:

3. 从其他RDD创建

主要是通过一个RDD运算完后,再产生新的RDD,以上述产生的RDD为例,对齐施加一个map操作,在每一行的末尾加上一个“!”。代码如下:

val rdd1 = rdd.map(item => {item + "!"})
rdd1.collect().foreach(println)
结果如下:

4. 直接创建 RDD(new)

使用new的方式直接构造RDD,一般由Spark框架本身使用,这里就不举例子了。

RDD的操作(算子)

RDD的操作算子包括两类,一类叫做Transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做Actions,它是用来触发RDD的计算,得到RDD相关结算结果或者将RDD保存到文件系统中去。下面分别来介绍:

Transformations(转换算子)

map算子

对处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也是可以数值的转化,然后返回一个新的RDD。
下面的例子中的map算子的作用是把RDD中的所有元素都乘以2。

val rdd = sc.makeRDD(List(1,2,3,4))
val mapRdd = rdd.map((num:Int)=>{num*2})
mapRdd.collect().foreach(println)
结果:

mapPartitions算子

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
下面的例子是要筛选出RDD中数值为2的元素

val rdd = sc.makeRDD(List(1,2,3,4))
val mapRdd = rdd.mapPartitions(datas => {
    datas.filter(_==2) }
)
mapRdd.collect().foreach(println)
结果:

map与mapPartitions从以下3个方面的区别:

mapPartitionsWithIndex算子

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
下面的例子在创建RDD的时候,显式指定只包含2个分区,然后返回指定分区的迭代器。代码如下:

val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
val mpiRDD = rdd.mapPartitionsWithIndex(
     (index, iter) => {
            if ( index == 1 ) {
                iter
            } else {
                Nil.iterator
            }
        }
    )
mpiRDD.collect().foreach(println)
结果:

flatMap算子

将处理的数据进行扁平化后再进行映射处理,所以该算子也称之为扁平映射。例子如下:

val rdd: RDD[List[Int]] = sc.makeRDD(List(
    List(1, 2), List(3, 4)
))
val flatRDD: RDD[Int] = rdd.flatMap(
    list => {
        list
    }
)
rdd.collect().foreach(println)
flatRDD.collect().foreach(println)
将原先两个列表铺平成一个列表,结果如下:

groupBy算子

将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中。

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组。

下面的例子是根据元素的首字母对RDD进行分组

val rdd  = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop", "Ice"), 2)
val groupRDD = rdd.groupBy(_.charAt(0))
groupRDD.collect().foreach(println)
结果:

可以看到原先的元素被按照首字母区别被分为了三组。

filter算子

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产条件下,可能会出现数据倾斜。
下面的例子是对RDD中的数据根据奇偶性来进行过滤,代码如下:

val rdd = sc.makeRDD(List(1,2,3,4))
val filterRDD: RDD[Int] = rdd.filter(num => num%2!=0)
filterRDD.collect().foreach(println)
结果如下:

sample算子

根据指定的规则从数据集中抽取数据,此算子包含3个参数,含义分别如下:

val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
println(rdd.sample(
false,
0.5,
).collect().mkString(","))

println(rdd.sample(
true,
0.4,
).collect().mkString(","))
两种抽取方式的结果如下:

distinct算子

对数据进行去重。

val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))
val rdd1 = rdd.distinct()
rdd1.collect().foreach(println)
结果:

coalesce算子

根据参数来缩减分区,用于大数据集过滤后,提高小数据集的执行效率。当Spark程序中,存在过多的小人物的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本。但是该方法默认情况下不会将分区的数据打乱重新组合,这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜。如果想让数据均衡,可以进行shuffle处理。

val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3) //默认使用3个分区
val newRDD = rdd.coalesce(2,true) //将shuffle操作打开
newRDD.saveAsTextFile("./") // 分区的情况可以通过保存的文件数量来判断
结果:

可以看到目前变成了2个分区。

repartition算子

该操作内部其实执行的是coalesce操作,参数shuffle的默认值是true。无论是将分区多的RDD转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。

val dataRDD = sc.makeRDD(List( 1,2,3,4,1,2), 2)
val dataRDD1 = dataRDD.repartition(4)
dataRDD1.saveAsTextFile("./") // 分区的情况可以通过保存的文件数量来判断
结果:

可以看到被分为了4个分区

sortBy算子

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD分区数与原RDD分区数一致,这个过程中存在shuffle过程。

val dataRDD = sc.makeRDD(List(1,2,3,4,1,9), 2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
dataRDD1.collect().foreach(println)
排序结果为:

下面展示经过f函数处理之后,再进行排序操作:

val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
val newRDD = rdd.sortBy(t=>t._1.toInt, false)
newRDD.collect().foreach(println)

可以看到此时的sortBy函数是先对每个元素进行处理之后,再进行排序的。

intersection算子、union算子、subtract算子、zip算子

下面以一个综合的例子来说明:

val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))

// 交集 : 【3,4】
val rdd3 = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))
// 并集 : 【1,2,3,4,3,4,5,6】
val rdd4 = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集 : 【1,2】
val rdd5 = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// 拉链 : 【1-3,2-4,3-5,4-6】
val rdd6 = rdd1.zip(rdd2)
val rdd8 = rdd1.zip(rdd7)
println(rdd6.collect().mkString(","))

reduceByKey算子、groupByKey算子

reduceByKey可以将数据按照相同的 Key 对 Value 进行聚合。
下面的例子是对Key相同的元素,将Value进行相加,代码如下:

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",5))) 
val dataRDD2 = dataRDD1.reduceByKey(_+_)
dataRDD2.collect().foreach(println)
结果为:

groupByKey将数据源的数据根据 key 对 value 进行分组。

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",5))) 
val dataRDD2 = dataRDD1.groupByKey()
dataRDD2.collect().foreach(println)
结果:

reduceByKey算子和groupByKey算子的区别:

Spark中涉及到shuffle操作,必须在磁盘中进行,不能在内存中进行,因为会等待,内存可能会溢出。GroupByKey需要罗盘,但是没有预聚合,故要罗盘的数据需要多一些,而reduceByKey可以先预聚合,故需要存到磁盘的数据相对较小一点,故效率较高。

aggregateByKey算子

之前提到的reduceByKey算子和GroupByKey算子在分区内以及分区间都只能执行相同的规则。如果在分区内和分区间需要应用不同的规则,应该怎么办呢?aggregateByKey算子可以将数据根据不同的规则进行分区内计算和分区间计算。aggregateByKey算子存在函数柯里化,即它有两个参数列表,第一个参数列表需要传递一个参数,表示为初始值,目的是当碰见第一个Key的时候,和对应的Value进行分区内计算。第二个参数列表主要传递两个参数,第一个参数表示分区内计算规则;第二个参数表示分区间计算规则。
下面的例子的功能是,先取同一个分区内的最大值,然后将不同分区取出的最大值进行相加。代码如下:

val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)
rdd.aggregateByKey(0)((x,y) => math.max(x,y), (x,y) => x + y).collect().foreach(println)
结果如下:

另外一个例子功能是,计算所有相同key的数据的平均值。即,我们需要统计所有key出现的次数,并且将相同key的value累加起来,再除以出现的次数,代码如下:

val rdd = sc.makeRDD(List(("a",1), ("a",2), ("b",3),
                                            ("b",4), ("b", 5), ("a", 6)), 2)
val newRDD = rdd.aggregateByKey( (0,0) )(//初始值tuple的第一个元素代表累加和,第二个代表出现的次数
(t, v) => {(t._1 + v, t._2 + 1)}, //分区内的计算,第一个代表初始值t,第二个代表元素中的value
(t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2)}
)
val rdd = sc.makeRDD(List(("a",1), ("a",2), ("b",3), ("b",4), ("b", 5), ("a", 6)), 2)
val resRDD = newRDD.mapValues {
   case (num, cnt) => {num/cnt}
}
resRDD.collect().foreach(println)
结果:

foldByKey算子

当分区内计算规则和分区间计算规则相同时,aggregateByKey算子就可以简化为foldByKey算子。

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("a",3),("b",4)), 4) 
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
dataRDD2.collect().foreach(println)
结果:

combineByKey算子

最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。combineByKey方法需要三个参数,第一个参数表示将相同key的第一个数据进行结构的转换,实现相应操作;第二个参数表示分区内的计算规则;第三个参数表示分区间的计算规则。
还是以上述的计算相同key的平均值的例子来说明,看一下使用combineByKey如何来实现相同的功能,代码如下:

val rdd = sc.makeRDD(List(
    ("a", 1), ("a", 2), ("b", 3),
    ("b", 4), ("b", 5), ("a", 6)
),2)

val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey(
    v => (v, 1),
    ( t:(Int, Int), v ) => {
        (t._1 + v, t._2 + 1)
    },
    (t1:(Int, Int), t2:(Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
    }
)

val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
    case (num, cnt) => {
        num / cnt
    }
}
resultRDD.collect().foreach(println)
结果是一致的,如下:

下面比较一下reduceByKey、foldByKey、aggregateByKey、combineByKey的区别:

  • reduceByKey
    相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同。
  • FoldByKey
    相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同。
  • AggregateByKey
    相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规 则可以不相同,分别用3个参数来指定不同的规则。
  • CombineByKey
    当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

sortByKey算子

在一个(K,V)结构的RDD上调用,K必须实现Ordered接口特质,返回一个按照key进行排序的RDD,只有一个参数用来指定是升序还是降序。

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3))) 
val sortRDD1 = dataRDD1.sortByKey(true)
val sortRDD2  = dataRDD1.sortByKey(false)
sortRDD1.collect().foreach(println)
sortRDD2.collect().foreach(println)
结果:

join算子

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一个的(K,(V,W))的RDD。即,两个不同数据源的数据,相同的key的value会连接在一起,形成元祖。一般有以下两种情况:
如果两个数据源中的key没有匹配上,那么数据不会出现在结果中。如果两个数据源中key有多个相同的,会依次匹配,因此可能会出现笛卡尔乘积,数据会呈几何性增长,导致性能降低。
代码如下:

val rdd = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) 
rdd.join(rdd1).collect().foreach(println)
结果为:

如果出现了多个key相同,那么会分别匹配,一个例子如下:

val rdd = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"),(3,"s")))
val rdd1 = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) 
rdd.join(rdd1).collect().foreach(println)
结果:

可以看到key为3的元素,在join之后,出现了多个结果。

leftOuterJoin算子、rightOuterJoin算子

类似于SQL语句的左外连接,右外连接。左外和右外的区别就是以谁为主RDD而已。
代码:

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("d",3))) 
val dataRDD2 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd = dataRDD1.leftOuterJoin(dataRDD2)
rdd.collect().foreach(println)
结果:

cogroup算子

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD。相当于是两个RDD先在本身内根据key进行关联,将相同key的value放在一个Iterable,然后第一个RDD 会去第二个RDD中寻找key相同的RDD,进行关联,返回新的RDD,新的RDD中元素的value是迭代器。
代码如下:

val dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("d",3)))
val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3)))
val value = dataRDD1.cogroup(dataRDD2)
value.collect().foreach(println)
结果:

可以看到,对于其他RDD中不存在对应的key,cogroup会产生一个空的迭代器来填充。

Actions(行动算子)

reduce算子

聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。代码如下:

val rdd = sc.makeRDD(List(1,2,3,4))
// 聚合数据
val reduceResult = rdd.reduce(_+_)
println(reduceResult)
上述代码对RDD所有元素进行累加,结果如下:

collect算子

在driver 程序中,以数组的形式返回数据集中的所有元素。代码如下:

val rdd = sc.makeRDD(List(1,2,3,4))
//从Executor端收集数据到Driver端
rdd.collect().foreach(println) 
结果为:

count算子

返回RDD中元素的个数。代码:

val rdd = sc.makeRDD(List(1,2,3,4))
val countRDD: Long = rdd.count()
println(countRDD)
结果为:

take算子、takeOrdered算子

take算子返回一个由RDD的前n个元素组成的数组,takeOrdered返回该RDD排序后的前n个元素组成的数据。
代码如下:

val rdd = sc.makeRDD(List(1,3,2,4))
val takeRes = rdd.take(2)
println(takeRes.mkString(","))
val takeOrderedRes = rdd.takeOrdered(2)
println(takeOrderedRes.mkString(","))
结果:

aggregate算子

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。注意,这里的初始值会被多次用到。代码如下:

val rdd = sc.makeRDD(List(1, 2, 3, 4), 8)
// 将该RDD内所有元素相加得到结果
val result: Int = rdd.aggregate(10)(_ + _, _ + _)
println(result)
结果为:

这个100是怎么来的呢?注意这里分了8个分区,其中有4个分区是有数据的,每个分区的值加上初始值10,得到11,12,13,14,10,10,10,10,然后所有分区加起来的时候,还需要加上一个初始值10,故最终结果为10+10+10+10+10+11+12+13+14=100。

countByKey算子

统计每种key的个数,代码如下:

val rdd = sc.makeRDD(List((1,"a"), (1,"b"), (1,"c"), (2,"z"),(3,"c"),(3,"c")))
val res = rdd.countByKey()
println(res)
结果:

RDD 依赖关系

由于RDD是粗粒度的操作数据集,每个Transformation操作都会生成一个新的RDD,所以RDD之间会形成类似流水线的前后依赖关系;RDD和它所依赖的父RDDs的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。下图显示了RDD之间的依赖关系:



从上图可知:

通过RDDs之间你的这种依赖关系,一个任务流可以被描述为DAG(有向无环图),如下图所示,在实际执行过程中宽依赖对应于Shuffle(图中的reduceByKey和join),窄依赖中的所有转换操作可以通过类似于管道的方式一气呵成执行(图中map和union可以一起执行)。 RDD执行过程

RDD缓存

如果在应用程序中多次使用了同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方使用该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。 RDD缓存

RDD通过cache或者Persist方法将之前的结果进行缓存,默认情况下会把数缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算 子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

RDD实例--WordCount

下面介绍一个简单的Spark应用程序实例WordCount,它的目的是统计一个数据集中每个单词出现的次数。首先将从hdfs中加载数据得到原始RDD-0,其中每条记录为数据中的一行句子,经过一个flatMap操作,将一行句子切分为多个独立的词,得到RDD-1,再通过map操作将每个词映射为key-value形式,其中key为词本身,value为初始计数值1,得到RDD-2,将RDD-2中的所有记录归并,统计每个词的计数,得到RDD-3,最后将其保存到hdfs。

为了简单起见,我们先创建一个txt文件,包含4行,内容如下: test.txt
具体代码如下:
package rdd.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
    def main(args: Array[String]): Unit = {
        val SparkConnf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
        val sc = new SparkContext(SparkConnf)

        //根据文件创建原始RDD
        val rdd = sc.textFile("./datas/test.txt")
        val rdd1 = rdd.flatMap(line => line.split(" ")) //将每一行根据空格分成单个word
        val rdd2 = rdd1.map(word => (word, 1)) //将每个word,转换成(word,1)这样的元祖形式
        val rdd3 = rdd2.reduceByKey((x, y) => {x+y}) //使用reduceByKey算子,对相同的key进行累加聚合

        rdd3.collect().foreach(println)
    }
}
整个过程的执行示意图如下: 执行结果: WordCount结果

参考

上一篇下一篇

猜你喜欢

热点阅读