大数据Spark大数据,机器学习,人工智能

Spark 持久化(cache和persist的区别)

2018-07-12  本文已影响1人  董可伦

我的原创地址:https://dongkelun.com/2018/06/03/sparkCacheAndPersist/

1、RDD 持久化

Spark 中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据。这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法。

2、存储级别

每个持久化的 RDD 可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 StorageLevel 对象给 persist() 方法进行设置。
详细的存储级别介绍如下:

注意,在 Python 中,缓存的对象总是使用 Pickle 进行序列化,所以在 Python 中不关心你选择的是哪一种序列化级别。python 中的存储级别包括 MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY 和 DISK_ONLY_2 。

上面的几个缓存级别是官网给出的,但是通过源码看,实际上一共有12种缓存级别

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
.....

}

关于构造函数的几个参数


class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable

3、如何选择存储级别

Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择 :

4、删除数据

Spark 自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个 RDD,而不是等待该 RDD 被 Spark 自动移除,可以使用 RDD.unpersist() 方法

5、RDD的cache和persist的区别

cache()调用的persist(),是使用默认存储级别的快捷设置方法
看一下源码

/**
 * Persist this RDD with the default storage level (`MEMORY_ONLY`).
 */
def cache(): this.type = persist()

/**
 * Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

通过源码可以看出cache()是persist()的简化方式,调用persist的无参版本,也就是调用persist(StorageLevel.MEMORY_ONLY),cache只有一个默认的缓存级别MEMORY_ONLY,即将数据持久化到内存中,而persist可以通过传递一个 StorageLevel 对象来设置缓存的存储级别。

6、DataFrame的cache和persist的区别

官网和上的教程说的都是RDD,但是没有讲df的缓存,通过源码发现df和rdd还是不太一样的。

/**
 * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
 *
 * @group basic
 * @since 1.6.0
 */
def cache(): this.type = persist()

/**
 * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
 *
 * @group basic
 * @since 1.6.0
 */
def persist(): this.type = {
  sparkSession.sharedState.cacheManager.cacheQuery(this)
  this
}

/**
 * Persist this Dataset with the given storage level.
 * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,
 *                 `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
 *                 `MEMORY_AND_DISK_2`, etc.
 *
 * @group basic
 * @since 1.6.0
 */
def persist(newLevel: StorageLevel): this.type = {
  sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
  this
}

def cacheQuery(
    query: Dataset[_],
    tableName: Option[String] = None,
    storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock

可以到cache()依然调用的persist(),但是persist调用cacheQuery,而cacheQuery的默认存储级别为MEMORY_AND_DISK,这点和rdd是不一样的。

7、代码测试

新建一个测试的txt,文件越大越好,如果文件比较小,可能cache的效果还不如不cache的好。

import org.apache.spark.sql.SparkSession

object Test {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local").getOrCreate()
    val sc = spark.sparkContext

    val rdd = sc.textFile("files/test.txt")
    var beiginTime = System.currentTimeMillis
    println(rdd.count)
    var endTime = System.currentTimeMillis
    println("cost " + (endTime - beiginTime) + " milliseconds.")

    beiginTime = System.currentTimeMillis
    println(rdd.count)
    endTime = System.currentTimeMillis
    println("cost " + (endTime - beiginTime) + " milliseconds.")
    spark.stop
  }
}

先看一下没有cache的时间

79391
cost 575 milliseconds.
79391
cost 124 milliseconds.

然后将代码val rdd = sc.textFile("files/test.txt")替换为val rdd = sc.textFile("files/test.txt").cache,再进行测试

79391
cost 635 milliseconds.
79391
cost 44 milliseconds.

可以看到cache后第二次count的时间明显比没有cache第二次count的时间少很多,第一次count时间增加是因为要进行持久化,如果看总时间的话,只有多次使用该rdd的时候,效果才明显。

8、注意

cache()和persist()的使用是有规则的:
必须在transformation或者textfile等创建一个rdd之后,直接连续调用cache()或者persist()才可以,如果先创建一个rdd,再单独另起一行执行cache()或者persist(),是没有用的,而且会报错,大量的文件会丢失。

参考资料

https://blog.csdn.net/houmou/article/details/52491419

上一篇下一篇

猜你喜欢

热点阅读