大数据学习

Spark之Rdd

2020-02-17  本文已影响0人  TZX_0710

注:以下代码scala版本采用 2.11.12 spark的版本采用spark-2.4.5-bin-hadoop2.7
Rdd(Resilent Distributed Dataset)分布式数据集,在Spark当中充当基本单元的称呼,在多台机器上运行可以理解为是一个分布式的数组。Spark的核心是RDD,但RDD是个抽象类,具体实现由子类实现。

RDD创建方式

  • 从Hadoop文件系统、比如Hive、Hbase输入创建
  • 从父Rdd 转换得到新的RDD
  • 通过parallelize 或makeRDD将单机数据创建为分布式RDD

RDD的两种操作算子:1.Transformation(转换) 变换算子,这种操作不触发提交作业2.Action(行动) 触发SparkContext提交Job作业
RDD的重要内部属性 1.分区列表 2.计算每个分片的函数 3.对父RDD的依赖性 4.对key-value pair数据的分区器 ,控制分区策略和分区数 5.每个分区的地址列表

RDD常用算子整理

Transformation操作算子

如下文章示例用到的初始化数据

val sparkConf=new SparkConf().setAppName("spark api").setMaster("local")
val  sc=new SparkContext(sparkConf)
val data=Array(1,2,3,45,6)
val initRdd = sc.parallelize(data,3)
val array1=Array(2,3,4,5,6)
val array2=Array(7,8,9,6,4)
val rdd1 = sc.parallelize(array1)
val rdd2 = sc.parallelize(array2)
 val  keyV=sc.parallelize(Seq(("key1", "value1"), ("key2", "value2"), ("key3", "value3")))
  1. map算子 通过自定义函数,分别使每个数据通过func形成一个新的RDD
val mapRdd = initRdd.map(x=>x+1)
  1. flatMap 数据扁平化 通过自定义函数对RDD中的每个元素进行转换成一个新的元>素将生成的RDD存放到一个集合当中
val flatMapRdd = data.flatMap(x=> x+"" )
输出结果: 1  2   3   4   5   6
val wordsArray=Array("a b","c d 12 44","e aa f","g h","j kk l")
val frdd= sc.parallelize(wordsArray)
val flatRDD = frdd.flatMap(x=>x.split(" "))
flatRDD.collect().foreach(x=>{
   print(x+">>")
  })
执行操作:根据空格进行分割,分割完成之后每一个数据是一个单独的元素,
a和b进行>分割之后变成了 a 是一个数据 b 也是一个 数据
输出结果:a>>b>>c>>d>>12>>44>>e>>aa>>f>>g>>h>>j>>kk>>l>>
  1. mapPartions 获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个>分区的元素进行操作
//parallelize使用创建RDD的时候使用了3块
//分区去存储数据 mapPartitions迭代了三块分区  分区里面在进行迭代 分区当中的>数据
//此处 迭代分区 然后对分区当中数据进行一个过滤操作 只需要大于2的数据保留
//x是分区的迭代器 采用分区的迭代去过滤只需要大于2的数据
//注:mapPartions返回的跟操作的rdd类型一致
val mapPardd=initRdd.mapPartitions(x=>{
    x.filter(x=>x>2)
   }).collect().foreach(println(_))
  1. union 合并2个rdd,2个rdd的元素类型必须一致,不进行去重操作,如果需要去重操作可以采用distinct
   val rdd3 = rdd1.union(rdd2)
  rdd3.collect().foreach(x=>print(x+">"))
  输出结果:2>3>4>5>6>7>8>9>6>4>
   val rdd4 = rdd3.distinct()
   rdd4.collect().foreach(x=>print(x+">"))
   输出结果:4>6>8>2>3>7>9>5>
  1. cartesin对2个rdd内的所有元素实现笛卡尔积元素操作,内部实现CartesianRDD
val dkejRdd = rdd1.cartesian(rdd2)
(2,7)(2,8)(2,9)(2,6)(2,4)(3,7)(3,8)(3,9)
(3,6)(3,4)(4,7)(4,8)(4,9)(4,6)(4,4)(5,7)
(5,8)(5,9)(5,6)(5,4)(6,7)(6,8)(6,9)(6,6)(6,4)
  1. groupByKey 根据key 进行分组,key相同的元素为一组数据
val  rdd = Array(("a",1),("b",1),("c",1),("a",1),("b",1))
val groupByKey = sc.parallelize(rdd).groupByKey()
groupByKey.collect().foreach(println(_))
(a,CompactBuffer(1, 1))
(b,CompactBuffer(1, 1))
(c,CompactBuffer(1))
  1. filter 对RDD元素进行过滤 最后只保留符合条件的元素
val filterRdd = initData.filter(x=>x>3)
filterRdd.collect().foreach(println(_))

8.sample(withReplacement, fraction, seed)
对RDD数据进行取样
withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复
fraction:Double, 在0~1之间的一个浮点值,表示要采样的记录在全体记录中的比例
seed:随机数种子 一般只需要前2个参数 第三个参数用于程序中的调试

initData.sample(false,0.5).collect().foreach(println(_))
输出结果为 1 2 45  
//把抽出样本是否放回设置为True比率设置为0.9  输出结果为 1 2 2 6  
initData.sample(true,2).coolect().foreach(println(_))
//表示每个元素的期望被抽样次数为2次
  1. cache 将RDD元素从磁盘中缓存到内存相当于persist(MEMORY_ONLY)函数的功能
D盘下面新建一个hello.txt文件 里面填充数据   不加cache的输出形式
   val textFileRDD = sc.textFile("D:\\hello.txt")
   val cacheRdd = textFileRDD.map(x =>{println("cacheRDD")})//.cache()
   cacheRdd.filter(x=>x.toString.startsWith("a")).collect().foreach(println(_))
   cacheRdd.filter(x=>x.toString.startsWith("b")).collect().foreach(println(_))

输出结果



加了cache之 后的效果我们可以看到只打印了一次cacheRDD 在第二次使用的时候没有再打印cacheRDD


  1. persist 对RDD进行缓存操作 具体存放可以通过StorageLevel自行选择存放内存或者磁盘,或同时存放内存和磁盘上
 val textFileRDD = sc.textFile("D:\\hello.txt")
  val cacheRdd = textFileRDD.map(x =>{
    println("cacheRDD")
     x
   }).persist(StorageLevel.MEMORY_ONLY) 
  cacheRdd.filter(x=>x.toString.startsWith("a")).collect().foreach(println(_))
  cacheRdd.filter(x=>x.toString.startsWith("b")).collect().foreach(println(_))
  1. mapValues 对key-value的RDD进行map操作,不对key进行处理。和map的差异在于map可以对key和value进行修改等处理,mapValues只能对value参数处理
keyV.mapValues(e=>e+"test").collect().foreach(println(_))
输出结果:(key1,value1test)
(key2,value2test)
(key3,value3test)
  1. combineByKey(createCombiner,mergeValue,mergeComiers)用于聚合求平均值
    createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加>操作(类型转换)并把它返回 (这一步类似于初始化操作)
    mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内>进行)
    mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
example1.求学生的平均城级
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), 
("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
   val d1 = sc.parallelize(initialScores)
   type mvcType=Double
   type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)
   val  mapValues= d1.combineByKey(
     score => (1, score),//1表示当前科目的数量为1  score表示当前的分数也就是88.0
     (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), //此处表示其它科目的数量+1,分数累加 合并科目的数量和值
     (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)//数据可能存在多个分区所以最后一步是合并分区的数据
   ).map(x=>(x._1,x._2._2/x._2._1)).collect().foreach(println(_))
输出打印:(Wilma,95.33333333333333)(Fred,91.33333333333333)
example2. 求每个学生的总成绩
val students=Array(("Nike",120),("Nike",32),("Lucy",10))
value.combineByKey(x=>x,
     (c1:Int,newScore)=>newScore+c1,
     (b1:Int,b2:Int)=>b1+b2
   ).collect().foreach(println(_))
/*注:score=>(1,score)把分数作为参数传入进去,并且附加了元组类型。
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore)在同一个分区又碰到了key相同的数据,我们把数量+1,参数值进行累加
 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)把
所有的分区的数据进行累加起来,因为计算的数据可能会存在其他分区,相当于分区数据的汇总*/
  1. reduceByKey合并相同的键的值 形成一个新的rdd元素
val rdd3 =sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
   rdd3.reduceByKey((x,y)=>x+y).collect().foreach(println(_))
输出:(B,3)
       (A,3)
       (C,1)
  1. join 是连接操作,将输入数据集(K,V)和另外一个数据集(K,W)进行Join, 得到(K, (V,W));该操作是对于相同K的V和W集合进行笛卡尔积 操作,也即V和W的所有组
 val x = sc.parallelize(Array(("a", 1), ("b", 4)))
 val  y = sc.parallelize(Array(("a", 2), ("a", 3)))
   x.join(y).collect().foreach(println(_))
输出结果:(a,(1,2))  (a,(1,3))

Action 算子

  1. foreach foreach对每个元素都应用。对RDD中的每个元素都应用f函数操作,不返回RDD和Array
initRdd.collect().foreach(println(_))
  1. saveAsTextFile 函数将数据输出,存储到HDFS目录。
x.join(y).saveAsTextFile("/usr/local")
  1. collect 将分布式的RDD返回为一个单机的sacla array数组。
  2. count 返回整个RDD的元素
x.join(y).count()
上一篇下一篇

猜你喜欢

热点阅读