RDD基本操作

2018-07-19  本文已影响0人  wong小尧

弹性分布式数据集(Resilient Distributed Dataset-RDD)
我使用spark比较晚,所以我使用dataframe比较多,听说rdd这块以后spark也停止更新了,但是目前dataframe还是不如rdd灵活,而且spark SQL一些方法不大稳定,有一些rdd的技巧还是要继续使用。
下面是整理的常用操作,一部分来自书里,一部分是自己整理的。

1.创建RDD

两种创建RDD的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。

1.Scala 中的parallelize()方法,直接把程序中一个已有的集合传给这个方法就可以生成一个RDD。
val lines = sc.parallelize(List("pandas","i like pandas"))
val lines = sc.parallelize(Array(1,2,3,4,5,6))
用case class来规范数据类型

case class UserData(userID:String,userName:String,userAddress:String)
val lines = sc.parallelize(
                                      List("1234","pandas","chengdu"),
                                      List("4321","tiger","jiling"),
                                      List("1111","bird","china")
)

2.更常用的方式是从外部存储中读取数据来创建RDD。
val lines = sc.textFile("D://wenben.txt")
3.使用makeRDD来构造(不建议使用)
val rdd01 = sc.makeRDD(List((1,2,null,null),(1,2,3,4),(4,3,null,null)))

2.RDD操作

RDD支持两种操作:转化操作和行动操作
转化操作返回的是RDD,而行动操作返回的是其他的数据类型。

2.1.转化操作(lazy模式):

最常用的转化操作是map()和filter(),inputRDD{1,2,3,4}
inputRDD.map(x => x * x)
Mapped RDD{1,4,9,16}
把列类型转化为字符串,map中通常使用v(0),索引从0开始
inputRDD.map(v => (v(0).toString,v(1).toString))

inputRDD.filter(x => x! = 1)
Filtered RDD{2,3,4}
filter筛选条件为,第一列字符串长度大于10,第二列字符串长度大于5。这里索引从1开始。
inputRDD.filter(v => v._1.length > 10 && v._2.length > 5)

flatMap和map操作

flatMap()相当于把map的作用的数组拆散再合并。

import org.apache.spark.{SparkConf, SparkContext}

object MapAndFlatMap {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("map_flatMap_demo").setMaster("local"))
    val arrayRDD =sc.parallelize(Array("a_b","c_d","e_f"))
    arrayRDD.foreach(println)
/*
结果 :
a_b
c_d
e_f
*/

    arrayRDD.map(string=>{
      string.split("_")
    }).foreach(x=>{
      println(x.mkString(",")) 
    })
/*
结果:
a,b
c,d
e,f
所以map得到的RDD结果是 Array(Array("a","b"),Array("c","d"),Array("e","f"))
*/
    arrayRDD.flatMap(string=>{
      string.split("_")
    }).foreach(x=>{
      println(x.mkString(","))//打印结果3
    })
/*
结果:
a
b
c
d
e
f
所以flatMap得到的RDD结果是Array("a","b","c","d","e","f")
*/
  }
}

1.对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作

函数名 目的 示例 结果
map() 函数应用于rdd中每个元素 rdd.map(x => x+1) {2,3,4,4}
flatMap() 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词,执行扁平化操作。 rdd.flatMap(x => x.to(3)) {1,2,3,2,3,3,3}
filter() 返回一个由通过传给filter()的函数的元素组成的RDD rdd.filter(x => x!=1) {2,3,3}
distinct() 去重 rdd.distinct() {1,2,3}
sample(withReplacement,fraction,[seed]) 对RDD采样,以及是否替换,抽取比例等等 rdd.sample(false,0.5) {1,2,3} 非确定的

sample操作

sample操作有两个参数,第一个参数是代表采样是有放回还是无放回,第二个参数代表抽样的比例。

map和mapPartitions

(1)、使用map(func())遍历
map相当于遍历,遍历1000行的表,就要调用func一千次。

(2)、使用mapPartitions(func())遍历
mapPartition中func()仅仅被调用分区数量的次数,例如10个分区,仅仅调用10次。假如函数内部存在分词词库导入关闭,数据库链接等等,使用map每调用func()一次都要跑一遍这些操作,严重影响性能。
这时候就需要把map改写成mapPartitions
改写方式也很简单:
例如可以用一下map,几乎不做任何操作:

    var data_rdd = df.rdd.map{x => {
      val (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type,stat_date) = (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))
      (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type)
      }}

改写成mapPatitions需要使用一下迭代器:

   val data_rdd = df.rdd.repartition(200).mapPartitions(iter => for (x <- iter) yield {
      val (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type,stat_date) = (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))
      (imei,address,categories,tags,parent,place_type,place_name,visit_time,nearby_type)
    })

2.对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作

函数名 目的 示例 结果
union() 生成一个包含两个RDD中所有元素的RDD (不去重的并集) rdd.union(other) {1,2,3,3,4,5}
intersection() 求两个RDD共同的元素的RDD (交集) rdd.intersection(other) {3}
subtract() 求移除一个RDD中的内容 (差集) rdd.subtract(other) {1,2}
cartesian() 笛卡尔积 rdd.cartesian(other) {(1,3),(1,4),...,(3,5)}

3.行动操作:

函数名 目的 示例 结果
collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,3}
count() RDD中元素的个数 rdd.count() 4
countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 从RDD中返回num个元素 rdd.take(2) {1,2}
top(num) 从RDD中返回最前面的num个元素 rdd.top(2) {3,3}
takeOrdered(num) 从RDD中按照提供的顺序返回最前面的num个元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeSampe(withReplacement,num,[seed]) 从RDD中返回任意一些元素 rdd.takeSample(false,1) 非确定的
reduce(func) 并行整合RDD中所有数据 rdd.reduce((x,y) => x+y) 9
fold(zero)(func) 和reduce类似,但是需要提供初始值 rdd.fold(0)((x,y) => x+y) 9
aggregate(zeroValue)(seqOp,comOp) 和reduce相似,但是通常返回不同类型的函数 rdd.aggregate((0,0)) ((x,y)=> (x._1+x,x._2+1) ,(x,y) => (x._1+y._1,x._2+y._2)) (9,4)
foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(func) 无 //和map进行对比,map也是对RDD中的每个元素进行操作,但是允许有返回值

first操作

返回RDD中第一个元素

collect操作

以集合形式返回RDD的元素(常用语小的数据集)
比如某个文件数据

中国
美国
加拿大
……

读取的时候希望把文件中的元素放到列表里

import scala.collection.mutable.ArrayBuffer
val stat_array = new ArrayBuffer[String]()
rdd.collect.foreach(v => (stat_array += v))

take操作

take(num:Int):将RDD作为集合,返回集合中[0,num-1]下标的元素

reduce操作

reduce(f:(T,T)=>T):对RDD中元素进行二元计算,返回计算结果

top操作

top(num:Int):按照默认的或者是指定的排序规则,返回前num个元素

takeOrdered操作

takeOrdered(num:Int):和top相反,返回前num个元素

foreach和map的区别:

foreach是action操作,它没有返回值,通常用于print或者写入文件等操作。
map是transform操作(lazy),生成一个新的rdd

sortBy函数(在org.apache.spark.rdd.RDD类中)

共三个参数:
  第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
  第二个参数是ascending,该参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
  第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等

例如:
data: (item_num, item_id, item_name)
val data_sorted = data.sortBy(_._1, ascending = false) 根据item_num从大到小排列(降序)。

4.键值对操作

4.1.partitionBy,mapValues,flatMapValues

partitionBy,mapValues,flatMapValues和基本转换操作中的repatition,map和flatMap功能类似。
partitionBy接口根据partitioner函数生成新的ShuffledRDD,将原RDD重新分区(在repartition中也是先将RDD[T]转化成RDD[K,V],这里V是null,然后使用RDD[K,V]作为函数生成ShuffledRDD)。mapValues和flatMapValues针对[K,V]中的V值进行map操作和flatMap操作。
使用partitionBy,mapValues,flatMapValues不会破坏原数据的partition的结构&信息,使用repatition,map和flatMap后还需要做Shuffle,数据就不带有原先的partition信息。所以键值对操作尽量使用partitionBy,mapValues,flatMapValues,少用repatition,map和flatMap。



4.2.combineByKey,foldByKey,reduceBykey,groupByKey

四种键值对转换操作都是针对RDD[K,V]本身,不涉及与其他RDD的组合操作,四种操作类型最终都会归结为堆combinByKey的调用。combineByKey接口是将RDD[K,V]转化成返回类型RDD[K,C],这里V类型与C类型可以相同也可以不同。
groupbykey效率很低,尽量使用reducebykey和combinebykey来代替groupbykey

reduceByKey函数

reduceByKey(_+_) 相当于 reduceByKey((a,b)=>(a+b))
reduceByKey(_++_) 相当于 reduceByKey((a,b)=>(a++b))这里a和b 是一个list,++用于合并列表

combineByKey函数

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

其中的参数:

createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
numPartitions:结果RDD分区数,默认保持原有的分区数
partitioner:分区函数,默认为HashPartitioner
mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

举例理解:

假设我们要将一堆的各类水果给榨果汁,并且要求果汁只能是纯的,不能有其他品种的水果。那么我们需要一下几步:

1 定义我们需要什么样的果汁。
2 定义一个榨果汁机,即给定水果,就能给出我们定义的果汁。--相当于hadoop中的local combiner
3 定义一个果汁混合器,即能将相同类型的水果果汁给混合起来。--相当于全局进行combiner

那么对比上述三步,combineByKey的三个函数也就是这三个功能
1 createCombiner就是定义了v如何转换为c
2 mergeValue 就是定义了如何给定一个V将其与原来的C合并成新的C
3 就是定义了如何将相同key下的C给合并成一个C

var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))

rdd1.combineByKey(
(v : Int) => List(v),             --将1 转换成 list(1)
(c : List[Int], v : Int) => v :: c,       --将list(1)和2进行组合从而转换成list(1,2)
(c1 : List[Int], c2 : List[Int]) => c1 ::: c2  --将全局相同的key的value进行组合
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

简要介绍
def combineByKey[C](createCombiner: (V) => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RD
createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就
和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
那个键对应的累加器的初始值
mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更
多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各
个分区的结果进行合并。

创建一个学生成绩说明的类
case class ScoreDetail(studentName: String, subject: String, score: Float)
下面是一些测试数据,加载测试数据集合 key = Students name and value = ScoreDetail instance

    val scores = List(
      ScoreDetail("xiaoming", "Math", 98),
      ScoreDetail("xiaoming", "English", 88),
      ScoreDetail("wangwu", "Math", 75),
      ScoreDetail("wangwu", "English", 78),
      ScoreDetail("lihua", "Math", 90),
      ScoreDetail("lihua", "English", 80),
      ScoreDetail("zhangsan", "Math", 91),
      ScoreDetail("zhangsan", "English", 80))

换成二元组, 也可以理解成转换成一个map, 利用了for 和 yield的组合

val scoresWithKey = for { i <- scores } yield (i.studentName, i)
val scoresWithKeyRDD = sc.parallelize(scoresWithKey).partitionBy(new org.apache.spark.HashPartitioner(3)).cache

聚合求平均值让后打印

      val avgScoresRDD = scoresWithKeyRDD.combineByKey(
      (x: ScoreDetail) => (x.score, 1)                     /*createCombiner*/,
      (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) /*mergeValue*/,
      (acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) /*mergeCombiners*/
      // calculate the average
    ).map( { case(key, value) => (key, value._1/value._2) })
 
    avgScoresRDD.collect.foreach(println)
/*输出:
(zhangsan,85.5)
(lihua,85.0)
(xiaoming,93.0)
(wangwu,76.5)
*/

解释一下scoresWithKeyRDD.combineByKey
createCombiner: (x: ScoreDetail) => (x.score, 1)
这是第一次遇到zhangsan,创建一个函数,把map中的value转成另外一个类型 ,这里是把(zhangsan,(ScoreDetail类))转换成(zhangsan,(91,1))
mergeValue: (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1) 再次碰到张三, 就把这两个合并, 这里是将(zhangsan,(91,1)) 这种类型 和 (zhangsan,(ScoreDetail类))这种类型合并,合并成了(zhangsan,(171,2))
mergeCombiners (acc1: (Float, Int), acc2: (Float, Int)) 这个是将多个分区中的zhangsan的数据进行合并, 我们这里zhansan在同一个分区,这个地方就没有用上


contRdd.combineByKey(
(score:(String,Long)) => Map(score._1 -> score._2),
(c:Map[String,Long],score) => (c ++ Map(score._1 -> score._2)),
(c1:Map[String,Long],c2:Map[String,Long]) => (c1 ++ c2) )

topk问题:
https://www.twblogs.net/a/5c602891bd9eee06ef371458/zh-cn

5 控制操作

5.1. cache,persist
cache底层是调用了persist。
5.2. checkpoint

6 例子:

6.1 join或者其他消耗较大的处理前先进行聚合操作

join通常是你在使用Spark时最昂贵的操作,需要在join之前应尽可能的先缩小你的数据。

假设,你有一个RDD存着(熊猫id,分数),另外一个RDD存着(熊猫id,邮箱地址)。若你想给每只可爱的熊猫的邮箱发送她所得的最高的分数,你可以将RDD根据id进行join,然后计算最高的分数,如下:

def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val joinedRDD = scoreRDD.join(addressRDD)
    joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}

然而,这可能不会比先减少分数数据的方案快。先计算最高的分数,那么每个熊猫的分数数据就只有一行,接下来再join地址数据:

def joinScoresWithAddress2( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)
    bestScoreData.join(addressRDD)
}

若每个熊猫有1000个不同的分数,那么这种做法的shuffle量就比上一种的小了1000倍。

Reference:

https://blog.csdn.net/weixin_42181200/article/details/81201864
【Spark快速大数据分析】
【Spark SQL入门与实践指南】

上一篇下一篇

猜你喜欢

热点阅读