Spark-RDD持久化

2018-11-05  本文已影响0人  数据萌新

(1) persist算子

var rdd = sc.textFile("test")
rdd = rdd.persist(StorageLevel.MEMORY_ONLY)
val count = rdd.count() //或者其他操作

StorageLevel的构造函数:

class StorageLevel private(
    private var _useDisk: Boolean, # 是否存入磁盘
    private var _useMemory: Boolean, # 是否存入内存
    private var _useOffHeap: Boolean, # 是否使用堆外内存
    private var _deserialized: Boolean, # 是否不进行序列化
    private var _replication: Int = 1 # 副本数(默认为1))

StorageLevel object中已经定义了几种代表RDD持久化的级别:

image.png

使用不同参数的组合构造的实例被预先定义为一些值,比如MEMORY_ONLY代表着不存入磁盘,存入内存,不使用堆外内存,不进行序列化,副本数为1,使用persisit()方法时把这些持久化的级别作为参数传入即可。

(2) cache算子

cache() = persist(StorageLevel.MEMORY_ONLY)

(3) checkpoint算子:可以把RDD持久化到HDFS

使用方法:

使用方法:
sc.setCheckpointDir("hdfs://...")
var rdd = sc.textFile("test")
rdd.checkpoint()
val count = rdd.count() //或者其他操作

checkpoint()执行原理:

  1. 当RDD的job执行完毕后,会从finalRDD从后往前回溯
  2. 当回溯到调用了checkpoint()方法的RDD后,会给这个RDD做一个标记
  3. Spark框架自动启动一个新的job,计算这个RDD的数据,然后把数据持久化到HDFS上
  4. 优化:对某个RDD执行checkpoint()之前,对该RDD执行cache(),这样的话,新启动的job只需要把内存中的数据上传到HDFS中即可,不需要重新计算。

(4) 关于这3个算子的几点说明

  1. 这3个算子都是Transformations类算子,需要Actions类算子触发才能执行
  2. cache 和 persist 算子的返回执行必须赋值给一个变量,在接下来的job中直接使用这个变量,那么就是使用了持久化的数据了,如果application中只有一个job,没有必要使用RDD持久化
  3. cache 和 persist 算子后不能立即紧跟action类算子,比如count算子,但是在下一行可以有action类算子
error : 
    cache().count()  
right : 
    rdd = rdd.cache()   
    rdd.count()

  1. checkpoint()算子执行后就切断了RDD之间的依赖
    当业务逻辑很复杂时,RDD之间频繁转换,RDD的血统很长,如果中间某个RDD的数据丢失,还需要重新从头计算,如果对中间某个RDD调用了checkpoint()方法,把这个RDD上传到HDFS,同时让后面的RDD不再依赖于这个RDD,而是依赖于HDFS上的数据,那么下次计算会方便很多。
  2. checkpoint持久化到磁盘和persist持久化到磁盘的区别
上一篇下一篇

猜你喜欢

热点阅读