Spark之Rdd
2020-02-17 本文已影响0人
TZX_0710
注:以下代码scala版本采用
2.11.12
spark的版本采用spark-2.4.5-bin-hadoop2.7
Rdd(Resilent Distributed Dataset)分布式数据集,在Spark当中充当基本单元的称呼,在多台机器上运行可以理解为是一个分布式的数组。Spark的核心是RDD,但RDD是个抽象类,具体实现由子类实现。RDD创建方式
- 从Hadoop文件系统、比如Hive、Hbase输入创建
- 从父Rdd 转换得到新的RDD
- 通过parallelize 或makeRDD将单机数据创建为分布式RDD
RDD的两种操作算子:1.Transformation(转换) 变换算子,这种操作不触发提交作业2.Action(行动) 触发SparkContext提交Job作业
RDD的重要内部属性 1.分区列表 2.计算每个分片的函数 3.对父RDD的依赖性 4.对key-value pair数据的分区器 ,控制分区策略和分区数 5.每个分区的地址列表
RDD常用算子整理
Transformation操作算子
如下文章示例用到的初始化数据
val sparkConf=new SparkConf().setAppName("spark api").setMaster("local") val sc=new SparkContext(sparkConf) val data=Array(1,2,3,45,6) val initRdd = sc.parallelize(data,3) val array1=Array(2,3,4,5,6) val array2=Array(7,8,9,6,4) val rdd1 = sc.parallelize(array1) val rdd2 = sc.parallelize(array2) val keyV=sc.parallelize(Seq(("key1", "value1"), ("key2", "value2"), ("key3", "value3")))
- map算子 通过自定义函数,分别使每个数据通过func形成一个新的RDD
val mapRdd = initRdd.map(x=>x+1)
- flatMap 数据扁平化 通过自定义函数对RDD中的每个元素进行转换成一个新的元>素将生成的RDD存放到一个集合当中
val flatMapRdd = data.flatMap(x=> x+"" ) 输出结果: 1 2 3 4 5 6 val wordsArray=Array("a b","c d 12 44","e aa f","g h","j kk l") val frdd= sc.parallelize(wordsArray) val flatRDD = frdd.flatMap(x=>x.split(" ")) flatRDD.collect().foreach(x=>{ print(x+">>") }) 执行操作:根据空格进行分割,分割完成之后每一个数据是一个单独的元素, a和b进行>分割之后变成了 a 是一个数据 b 也是一个 数据 输出结果:a>>b>>c>>d>>12>>44>>e>>aa>>f>>g>>h>>j>>kk>>l>>
- mapPartions 获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个>分区的元素进行操作
//parallelize使用创建RDD的时候使用了3块 //分区去存储数据 mapPartitions迭代了三块分区 分区里面在进行迭代 分区当中的>数据 //此处 迭代分区 然后对分区当中数据进行一个过滤操作 只需要大于2的数据保留 //x是分区的迭代器 采用分区的迭代去过滤只需要大于2的数据 //注:mapPartions返回的跟操作的rdd类型一致 val mapPardd=initRdd.mapPartitions(x=>{ x.filter(x=>x>2) }).collect().foreach(println(_))
- union 合并2个rdd,2个rdd的元素类型必须一致,不进行去重操作,如果需要去重操作可以采用distinct
val rdd3 = rdd1.union(rdd2) rdd3.collect().foreach(x=>print(x+">")) 输出结果:2>3>4>5>6>7>8>9>6>4> val rdd4 = rdd3.distinct() rdd4.collect().foreach(x=>print(x+">")) 输出结果:4>6>8>2>3>7>9>5>
- cartesin对2个rdd内的所有元素实现笛卡尔积元素操作,内部实现CartesianRDD
val dkejRdd = rdd1.cartesian(rdd2) (2,7)(2,8)(2,9)(2,6)(2,4)(3,7)(3,8)(3,9) (3,6)(3,4)(4,7)(4,8)(4,9)(4,6)(4,4)(5,7) (5,8)(5,9)(5,6)(5,4)(6,7)(6,8)(6,9)(6,6)(6,4)
- groupByKey 根据key 进行分组,key相同的元素为一组数据
val rdd = Array(("a",1),("b",1),("c",1),("a",1),("b",1)) val groupByKey = sc.parallelize(rdd).groupByKey() groupByKey.collect().foreach(println(_)) (a,CompactBuffer(1, 1)) (b,CompactBuffer(1, 1)) (c,CompactBuffer(1))
- filter 对RDD元素进行过滤 最后只保留符合条件的元素
val filterRdd = initData.filter(x=>x>3) filterRdd.collect().foreach(println(_))
8.sample(withReplacement, fraction, seed)
对RDD数据进行取样
withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复
fraction:Double, 在0~1之间的一个浮点值,表示要采样的记录在全体记录中的比例
seed:随机数种子 一般只需要前2个参数 第三个参数用于程序中的调试initData.sample(false,0.5).collect().foreach(println(_)) 输出结果为 1 2 45 //把抽出样本是否放回设置为True比率设置为0.9 输出结果为 1 2 2 6 initData.sample(true,2).coolect().foreach(println(_)) //表示每个元素的期望被抽样次数为2次
- cache 将RDD元素从磁盘中缓存到内存相当于persist(MEMORY_ONLY)函数的功能
D盘下面新建一个hello.txt文件 里面填充数据 不加cache的输出形式 val textFileRDD = sc.textFile("D:\\hello.txt") val cacheRdd = textFileRDD.map(x =>{println("cacheRDD")})//.cache() cacheRdd.filter(x=>x.toString.startsWith("a")).collect().foreach(println(_)) cacheRdd.filter(x=>x.toString.startsWith("b")).collect().foreach(println(_))
输出结果
加了cache之 后的效果我们可以看到只打印了一次cacheRDD 在第二次使用的时候没有再打印cacheRDD
- persist 对RDD进行缓存操作 具体存放可以通过StorageLevel自行选择存放内存或者磁盘,或同时存放内存和磁盘上
val textFileRDD = sc.textFile("D:\\hello.txt") val cacheRdd = textFileRDD.map(x =>{ println("cacheRDD") x }).persist(StorageLevel.MEMORY_ONLY) cacheRdd.filter(x=>x.toString.startsWith("a")).collect().foreach(println(_)) cacheRdd.filter(x=>x.toString.startsWith("b")).collect().foreach(println(_))
- mapValues 对key-value的RDD进行map操作,不对key进行处理。和map的差异在于map可以对key和value进行修改等处理,mapValues只能对value参数处理
keyV.mapValues(e=>e+"test").collect().foreach(println(_)) 输出结果:(key1,value1test) (key2,value2test) (key3,value3test)
- combineByKey(createCombiner,mergeValue,mergeComiers)用于聚合求平均值
createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加>操作(类型转换)并把它返回 (这一步类似于初始化操作)
mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内>进行)
mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)example1.求学生的平均城级 val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)) val d1 = sc.parallelize(initialScores) type mvcType=Double type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数) val mapValues= d1.combineByKey( score => (1, score),//1表示当前科目的数量为1 score表示当前的分数也就是88.0 (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), //此处表示其它科目的数量+1,分数累加 合并科目的数量和值 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)//数据可能存在多个分区所以最后一步是合并分区的数据 ).map(x=>(x._1,x._2._2/x._2._1)).collect().foreach(println(_)) 输出打印:(Wilma,95.33333333333333)(Fred,91.33333333333333) example2. 求每个学生的总成绩 val students=Array(("Nike",120),("Nike",32),("Lucy",10)) value.combineByKey(x=>x, (c1:Int,newScore)=>newScore+c1, (b1:Int,b2:Int)=>b1+b2 ).collect().foreach(println(_)) /*注:score=>(1,score)把分数作为参数传入进去,并且附加了元组类型。 (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore)在同一个分区又碰到了key相同的数据,我们把数量+1,参数值进行累加 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)把 所有的分区的数据进行累加起来,因为计算的数据可能会存在其他分区,相当于分区数据的汇总*/
- reduceByKey合并相同的键的值 形成一个新的rdd元素
val rdd3 =sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1))) rdd3.reduceByKey((x,y)=>x+y).collect().foreach(println(_)) 输出:(B,3) (A,3) (C,1)
- join 是连接操作,将输入数据集(K,V)和另外一个数据集(K,W)进行Join, 得到(K, (V,W));该操作是对于相同K的V和W集合进行笛卡尔积 操作,也即V和W的所有组
val x = sc.parallelize(Array(("a", 1), ("b", 4))) val y = sc.parallelize(Array(("a", 2), ("a", 3))) x.join(y).collect().foreach(println(_)) 输出结果:(a,(1,2)) (a,(1,3))
Action 算子
- foreach foreach对每个元素都应用。对RDD中的每个元素都应用f函数操作,不返回RDD和Array
initRdd.collect().foreach(println(_))
- saveAsTextFile 函数将数据输出,存储到HDFS目录。
x.join(y).saveAsTextFile("/usr/local")
- collect 将分布式的RDD返回为一个单机的sacla array数组。
- count 返回整个RDD的元素
x.join(y).count()