RDD编程
2017-07-21 本文已影响28人
zlcook
RDD基础
-
RDD:Resilient Distributed Datasets,弹性分布式数据集
-
分布在集群中的只读对象集合(由多个分区(Partition)构成,这些分区运行在集群中的不同节点上)
-
可以存储在磁盘或内存中(多种存储级别)
-
通过并行“转换”操作构造
-
失效后自动重构
-
RDD可以包含Python、java、Scala中任意类型的对象,甚至可以包含用户自定义的对象。
-
两种方法创建RDD:
-
1.读取外部数据集。
-
2.在驱动程序里分发驱动器程序中的对象集合(比如list和set)。
-
RDD支持两种类型操作
- 1.转化操作(tranformation):由一个RDD生成一个新的RDD.旧的RDD不会被改变。map、filter、groupBy、reduceBy
- 2.行动操作(action):对RDD计算出一个结果,并把结果返回到驱动程序中,或者写入外部存储系统中。count、collect、saveAsTextFile
-
注:转化操作返回的是RDD,行动操作返回的是其它数据类型。
操作示例
-
-
惰性求值
-
转化操作和行动操作的区别在于Spark计算RDD的方式不同。RDD的转化操作都是惰性求值,即对RDD调用转化操作(如map())时,操作不会立即执行,它们只有第一次在一个行动操作中用到时才会真正计算。
-
默认情况下,Spark的RDD会在每次对它们进行行动操作时重新计算,如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist(),让Spark把这个RDD缓存起来。默认缓存到内存中(以分区方式存储到集群中各个机器上)
-
持久化(缓存)
- 持久化原因如上。持久化数据丢失怎么办?让Spark持久化一个RDD,计算出RDD的节点会分别保存它们所求出的分区数据,如果一个有持久化的节点发生故障,Spark会在用到缓存的数据时重算丢失的数据分区,当然可以把数据备份到多个节点上,以避免单节点故障拖累进度。
-
持久化数据方式:默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中,当我们把数据写到磁盘或堆外存储上是也总是使用序列化数据。
-
持久化级别:
如果采用缓存在内存中的级别,当内存放不下是,Spark会自动利用最近最少使用(LRU)的策略吧最老的分区从内存中移除。 -
image.png
向Spark传递函数
- Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。
- 在Scala 中,我们可以把定义的内联函数、方法的引用或静态方法传递给Spark,就像Scala 的其他函数式API 一样。我们还要考虑其他一些细节,比如所传递的函数及其引用的数据需要是可序列化的(实现了Java 的Serializable 接口)。
- 如果在Scala 中出现了NotSerializableException,通常问题就在于我们传递了一个不可序列化的类中的函数或字段。记住,传递局部可序列化变量或顶级对象中的函数始终是安全的。
常见转化操作和行动操作

-
sample(withReplacement, fraction, seed):对RDD采样,以及是否替换。
-
对一个RDD的转化操作
-
对两个RDD的转化操作
-
对一个RDD进行的行动操作
一个完整案例

