RDD持久化
rdd的持久化算子有三种:
1、cache:将数据持久化到内存
2、persist:可以将数据持久化到磁盘,也可以将数据持久化到内存
3、checkpoint:将数据持久化到磁盘。
cache
我们经常会对RDD执行一系列Transformation算子操作,逻辑上每经历一次变换,就会将RDD转换为一个新的RDD,RDD会被划分成很多的分区分布到集群的多个节点中。
分区是逻辑概念,为了防止函数式数据不可变行(immutable)导致的内存需求无限扩张,使得系统内存被快速用完,Spark使用延迟执行(lazy)的方式执行,即只有操作累计到Action(行动),算子才会触发整个操作序列的执行,中间结果不会单独再重新分配内存,而是在同一个数据块上进行流水线操作。
也就是说变换前后的新旧RDD的分区在物理上可能是同一块内存存储,这是Spark内部做的优化。有些RDD是计算的中间结果,其分区并不一定有相对应的内存或磁盘数据与之对应,所以如果想要复用某一个RDD,需要通过Cache算子,将数据缓存(或者说固化)到内存中
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
说明是cache()调用了persist().
persist的几个参数:
cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间.
defpersist(newLevel: StorageLevel):this.type= {
// TODO:Handle changes of StorageLevel
if (storageLevel!= StorageLevel.NONE&&newLevel != storageLevel) {//storageLevel一致性检查
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it wasalready assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automaticGC-based cleanup
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
- StorageLevel
persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。
classStorageLevel private(
private var useDisk_ :Boolean,
private var useMemory_ : Boolean,
private var useOffHeap_ :Boolean,
private var deserialized_ :Boolean,
private var replication_ :Int = 1)
至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
- 构造函数参数
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
useDisk:使用硬盘(外存)
useMemory:使用内存
useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
replication:备份数(在多个节点上备份)
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
checkpoint
checkpoint 的作用是把 RDD 存储到一个高可用的地方(通常这个地方就是HDFS,HDFS会把文件复制多个复本 保存在其他节点上)
Spark 在生产环境下经常会面临 Transformation 的 RDD 非常多(例如一个Job 中包含1万个RDD) 或者是具体的 Transformation 产生的 RDD 本身计算特别复杂和耗时(例如计算时常超过1个小时) , 可能业务比较复杂,此时我们必需考虑对计算结果的持久化。
可以采用 persists 把数据在内存 或磁盘中,但却不可靠,如果磁盘或内存会损坏,数据就会丢失。
所以就有了Checkpoint,Checkpoint 的作用是把 RDD 存储到一个高可用的地方(通常这个地方就是HDFS,HDFS会把文件复制多个复本 保存在其他节点上)
总结
cache和persist的注意事项
1、cache和persist都是懒执行算子,需要有一个action算子触发执行
2、cache和persist算子的返回执行都必须赋值给一个变量,在接下来的job中直接使用这个变量,那么就是使用了持久化数据了,如果application中只有一个job,没有必要使用rdd进行持久化
3、cache和persist算子后边不能立即紧跟action类算子,比如count算子,但是下一行可以有action算子
4、cache是persist的特殊情况
5、checkpoint算子实际上是将rdd持久化到hdfs上的,同时切断rdd之间的依赖。