眼君的大数据之路

spark开发笔记(二、RDD编程笔记)

2020-09-01  本文已影响0人  眼君

RDD编程

RDD的基本概念

Spark编程模型是弹性分布式数据集(Resilient Distributed Dataset, RDD),一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集合中不同的节点上,从而可以在集群中的不同节点上进行并行计算。

RDD的操作类型

Spark编程中开发者需要编写一个驱动程序(Driver Program)来连接到工作进行(Worker)。驱动程序定义一个或多个RDD以及相关行动操作,驱动程序同时记录RDD的继承关系,即“血缘”。而工作进程(Worker)是一直运行的进程,它将经过一系列操作后的RDD分区数据保存在内存中。Spark中的操作大致分为四类操作:创建操作、转换操作、控制操作和行动操作。

转换操作(Transformation Operation)

将RDD通过一定的操作变换成新的RDD,比如HadoopRDD可以使用map操作变换成MappedRDD,RDD的转换操作是惰性操作,它只定义了一个新的RDDs,并没有立即执行。

控制操作(Control Operation)

进行RDD持久化,可以让RDD按不同的存储策略保存在磁盘或者内存中,比如cache接口默认将RDD缓存在内存中。

行动操作(Action Operation)

能够触发Spark运行的操作,例如,对RDD进行collect就是行动操作。Spark中的行动操作分为两类,一类的操作结果变成Scala集合或者变量,另一类将RDD保存到外部文件系统或者数据库中。

创建操作(Creation Operation)

目前有两种创建RDD的方式:

1. 从文件系统中加载数据创建RDD。

spark采用textFile()方法从文件系统中加载数据创建RDD。该方法把文件的URI作为参数,这个URI可以是:

本地文件系统的地址
分布式文件系统HDFS的地址
Amazon S3的地址等
从本地文件系统中加载数据:

val lines = sc.textFile("file:///user/local/xxx/word.txt")

从分布式文件系统中加载数据,以下三行语句是等价的:

val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
val lines = sc.textFile("/user/hadoop/word.txt")
val lines = sc.textFile("word.txt")
2. 通过并行集合(数组)创建RDD。

spark通过SparkContext的parallelize方法,在driver中一个已经存在的集合(数组)上创建RDD。


转换操作(Transformation Operation)

对于RDD而言,每一次转换操作都会产生不同的RDD,供下一个操作使用。
转换操作得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,从DAG图的源头开始进行“从头到尾”的计算操作。

filter(func): 筛选出满足函数func的元素,并返回一个新的数据集。

map(func): 对RDD中每个元素都执行一个指定都函数来产生一个新的RDD,任 何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

flatMap(func):与map类似,区别是原RDD中每个元素经过map处理后只能生成一个元素,而flatMap允许生成一个或多个元素构建新RDD。

groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集。

reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个Key传递到函数的func中进行聚合。

distinct操作是去除RDD重复都元素,返回所有元素不重复的RDD。

行动操作(Action Operation)

行动操作是真正触发计算的地方,Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换,最终得到计算结果。

count():返回数据集中的元素个数。
collect():以数组的形式返回数据集中的所有元素。
first():返回数据集中的第一个元素。
take(n):以数组的形式返回数据集中的前n个元素。
reduce(func):通过函数func聚合数据集中的元素。
foreach(func):将数据集中的每个元素传递到函数func中运行。

惰性机制

这里给出一段简单的代码来解释Spark的惰性机制:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s=>s.length)
val totalLength = lineLengths.reduce((a,b) => a+b)

第三行代码的reduce方法是一个行动操作,会触发真正的计算。
这时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行属于它自己的map和reduce,最后把结果返回给Driver。


控制操作(Control Operation)与持久化

在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。这对于迭代计算而言,代价是很大的,因为迭代计算经常需要多次重复使用同一组数据。



对于上述这种情况,Spark可以使用持久化(缓存)的机制避免这种重复计算的开销,可以使用persist()方法对一个RDD标记为持久化。

之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是需要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化。持久化后的RDD将会被保留在计算节点内存中被后面的行动操作使用。

Spark可以将RDD持久化到内存或磁盘文件系统中,把RDD持久化到内存中可以极大地提高迭代计算以及各计算模型之间的数据共享,一般情况下执行节点60%内存用于缓存数据,剩下的40%用于运行任务。
Spark中使用persist和cache操作进行持久化,其中cache是persist()的特例。

persist()圆括号中包含的是持久化级别参数:

  1. persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容;
  2. persist(MEMORY_AND_DISK):表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被放到硬盘上;
  3. unpersist():该方法可以手动地把持久化的RDD从缓存中移除;
    一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。
  4. cache(): 一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。

分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同节点上。使用分区可以增加并行度,减少通信开销。

分区原则

使得分区个数尽量等于集群中的CPU核心(core)数目。

默认分区数目

对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目。一般而言:

  1. 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N。存储在以下变量中:
spark.default.parallelism
  1. Apache Mesos:默认的分区数是8
  2. Standalone或YARN:在"集群中所有CPU核心总和"和"2"二者中取较大值作为默认值:
min(spark.default.parallelism,2)
手动创建分区数量
  1. 创建RDD时,在调用textFile和parallelize方法时手动指定分区个数即可,语法格式:
sc.textFile(path,partitionNum)
scala>val array = Array(1,2,3,4,5)
#设置两个分区
scala>val rdd = sc.parallelize(array,2)
  1. 通过转换操作得到新RDD时,直接调用repartition方法可以重新设置分区。
#将分区数量设置改变为1
scala>var rdd2 = data.repartition(1)
#获取当前分区数
scala>rdd2.partitions.size
#将分区数量设置改变为4
scala>var rdd2 = data.repartition(4)
#获取当前分区数
scala>rdd2.partitions.size

例如,我们需要根据key值的最后一位数字,写到不同的文件:

import org.apache.spark.{Partitioner, SparkContext,SparkConf}
//自定义分区类,需继承Partitioner类
class UsridPartitioner(numParts:Int) extends Partitioner{
    //覆盖分区数
    override def numPartitions:Int = numParts
    //覆盖分区号获取函数
    override def getPartition(key:Any):Int={
        key.toString.toInt%10
    }
}
object Test{
    def main(args:Array[String]){
        val conf=new SparkConf()
        val sc=new SparkContext(conf)
        //模拟5个分区
        val data=sc.parallelize(1 to 10,5)
        //根据尾号转变为10个分区,写到10个文件
        data.map((_,1)).partitionBy(new UsridPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/output")
    }
}
分区打印

在实际编程中,经常需要把RDD中的元素打印输出(标准输出stdout),一般会采用语句:

rdd.foreach(println)

或者

rdd.map(println)

当采用本地模式(local)在单机上打印时,会打印出RDD的所有元素。
但是,当采用集群模式执行时,在worker节点上执行打印语句时输出到worker节点的stdout中而不是输出到任务控制节点Driver Program中,因此,任务控制节点Driver Program中stdout是不会显示其他worker上打印的内容的。
这时,可以考虑使用collect()等行动操作将各个worker节点上的RDD都抓取到Driver Program中:

rdd.collect().foreach(println)

Pair-RDD编程

Pair-rdd创建方式

  1. 从文件中加载:
val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))
val pairRDD.foreach(println)
  1. 通过并行集合(数组)创建:
val list = List("Hadoop","Spark","Hive","Spark")
val rdd = sc.parallelize(list)
var pairRDD = rdd.map(word => (word,1))
pairRDD.foreach(println)

Pair-rdd转换操作

reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值:

(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Hadoop,1)
(Spark,2)
(Hive,1)
groupByKey()

groupByKey(func)的功能是,对具有相同键的值进行分组:

(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
pairRDD.groupByKey()
(Hadoop,1)
(Spark,(1,1))
(Hive,1)
reduceByKey(func)和groupByKey()的区别

以下代码,虽然两种结果相同,但过程有差异:

val words = Array("one","two","two","three","three","three")
val wordPairsRDD = sc.parallelize(words).map(word => (word,1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_+_)
val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1,t._2.sum))

当采用reduceByKey时,Spark可以在每个分区移动数据之前,将待输出数据与一个共用的key结合。



当采用groupByKey时,由于它不接收函数,Spark只能先将所有的键值对都移动,这样的后果是集群节点之间的开销很大,导致传输延时。


keys、values、sortByKey和sortBy()

keys只会把Pair RDD中的key返回形成一个新的RDD:

(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark

values则只会返回value形成一个新的RDD:

pairRDD.values.foreach(println)
1
1
1
1

sortByKey(args)的功能是返回一个根据键排序的RDD:

pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)

默认是按生序排序,如果args=False表示按降序排序。
sortBy()相比sortByKey()更灵活,可以实现根据value排序:

scala>val d1 = sc.parallelize(Array(("c",8),("b",25),("c",17),("1",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)))
scala>d1.reduceByKey(_+_).sortByKey(false).collect
res2:Array[(String,Int)] = Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))
scala>val d2 = sc.parallelize(Array(("c",8),("b",25),("c",17),("1",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)))
scala>d2.reduceByKey(_+_).sortBy(_._2,false).collect
res2:Array[(String,Int)] = Array((a,42),(b,38),(f,29),(c,27),(g,21),(e,17),(d,9))
mapValues(func)

mapValues(func)对键值对RDD中的每个value都应用一个函数func,但是key不会发生变换:

(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
scala>pairRDD.mapValues(x => x+1).foreach(println)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)
join操作

join表示内连接。对于内连接,给定的两个输入数据集(k,v1)和(k,v2),只有两个数据集中都存在的key才会被输出,最终会得到一个(k,(v1,v2))类型的数据集:

scala>val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
scala>val pairRDD2 = sc.parallelize(Array(("spark","fast")))
scala>pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))
一个求均值的例子

给定一组键值对("spark",2),("hadoop",6),("hadoop",4),("spark",6),key表示图书名称、value表示某天图书销量,请计算每个键对应的平均值:

val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd.mapValues(x=>(x,1)).reduceByKey((x,y) => (x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()

共享变量

当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量在每个任务上都生成一个副本;但是,有时候,需要多个任务之间共享变量,或者在任务和任务控制节点之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量和累加器。

广播变量

广播变量允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上每个任务都生成一个副本,用来把变量在所有节点的内存之间进行共享。

Spark的“行动”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。

可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下:

scala>val broadcastVar = sc.broadcast(Array(1,2,3))
scala>broadcastVar.value

这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用v的值,这样就不会把v重复分发到这些节点上。此外,一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object BroadCastValue{
    def main(args:Array[String]):Unit={
        val conf = new SparkConf().setAppName("BroadCastValue1").setMaster("local[1]")
        val sc = new SparkContext(conf)
        val broads=sc.broadcast(3)
        val lists=List(1,2,3,4,5)
        val listRDD=sc.parallelize(lists)
        val results=listRDD.map(x=>x*broads.value)
        results.foreach(x=>println("The result is:"+x))
        sc.stop
    }
}

累加器

累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。Spark原生地支持数值型(numeric)的累加器,程序开发人员可以编写对新类型的支持。累加器则支持所有不同节点之间进行累加计算(比如计算或者求和)。

一个数值型的累加器,可以通过调用SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()来创建运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。
下面代码演示使用累加器来对数组中元素进行求和:

scala>val accum = sc.longAccumulator("My Accumulator")
scala>sc.parallelize(Array(1,2,3,4)).foreach(x => accum.add(x))
scala>accum.value
res1:Long = 10

文件系统数据读写

本地文件系统的数据读写

val textFile = sc.textFile("file:///word.txt")

执行上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制。
在执行转换操作时,即使我们输入了错误的语句,spark-shell也不会马上报错,而是等到执行"行动"类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来。
以下命令将textFile变量中的内容再次写回另外一个文本文件中:

textFile.saveAsTextFile("file://usr/local/output.txt")

json文件系统的数据读写

Spark提供了一个Json样例数据文件,存放在"/usr/local/spark/examples/src/main/resources/people.json"中:

{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}

以下代码将该文件加载到RDD中:

scala>val jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
scala>jsonStr.foreach(println)

Scala中有一个自带的JSON库scala.util.parsing.json.JSON,可以实现对JSON数据的解析。
JSON.parseFull(jsonString:String)函数,以一个JSON字符串作为输入并进行解析,如果解析成功则返回Some(map:Map[String,Any]),如果解析失败则返回None。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON

object JSONApp{
    def main(args:Array[String]){
        val inputFile = "file:///usr/local/spark/people.json"
        val conf = new SparkConf().setAppName("BroadCastValue1").setMaster("local[1]")
        val sc = new SparkContext(conf)
        val jsonStrs = sc.textFile(inputFile)
        val result = jsonStrs.map(s => JSON.parseFull(s))
        result.foreach({
            r => r match{
                case Some(map:Map[String,Any]) => println(map)
                case None => println("Parsing failed")
                case other => println("Unknown data structure:" + other)
            }
        })
    }
}
上一篇下一篇

猜你喜欢

热点阅读