spark源码阅读之storage模块②
在spark源码阅读之storage模块①中,描绘了Storage模块的整体框架是标准的master-slave框架:master用来管理slave的元数据信息,slave则是具体存储数据,分析了作为master节点的BlockManagerMasterEndpoint和作为slave节点的BlockManagerSlaveEndpoint之间如何传递消息。
这篇文章中将分析数据Block存储的具体过程,分析它是如何实现的
本篇文章源码基于spark 1.6.3
存储级别
缓存RDD有两个方法,cache()和persist(),而cache方法底层调用的还是persist方法,只不过cache方法传入了默认的参数,算是persist的一个快捷操作。
persist的构造方法中可以传入存储级别,如下所示:
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
这个StorageLevel参数就是spark中的存储级别,代表storage的数据以什么方式存入什么媒介中
StorageLevel的构造方法如下:
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {...}
五个参数分别代表:
_useDisk:是否使用磁盘
_useMemory:是否使用内存
_useOffHeap:是否使用堆外存储
_deserialized:是否序列化
_replication:副本个数
除了_useOffHeap外,其他参数可以随意配合使用,使用方法如下:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
/**
* :: DeveloperApi ::
* Return the StorageLevel object with the specified name.
*/
@DeveloperApi
def fromString(s: String): StorageLevel = s match {
case "NONE" => NONE //不保存任何数据
case "DISK_ONLY" => DISK_ONLY //仅保存在磁盘
case "DISK_ONLY_2" => DISK_ONLY_2 //仅保存在磁盘,备份一份
case "MEMORY_ONLY" => MEMORY_ONLY //仅保存在内存
case "MEMORY_ONLY_2" => MEMORY_ONLY_2 //仅保存在内存,备份一份
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER //仅保存在内存,保存序列化后的对象
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2 //仅保存在内存,保存序列化后的对象,备份一份
case "MEMORY_AND_DISK" => MEMORY_AND_DISK //优先保存在内存,溢出部分保存在磁盘
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2 //同上,备份一份
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER //优先保存在内存,溢出部分保存在磁盘,保存序列化后的结果
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2 //同上,备份一份
case "OFF_HEAP" => OFF_HEAP //保存在堆外存储
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
}
即在persist方法中传入对应的字符串即可指定存储级别
存储级别的选择:
一般来说
- 优先使用内存MEMORY_ONLY,如果内存不够可以加上序列化MEMORY_ONLY_SER,当然也需要衡量序列化带来的cpu消耗
- 尽量不要使用磁盘,因为磁盘IO消耗的时间远大于内存,迫不得已重算partition数据可能都要更优,除非计算逻辑复杂,且内存放不下数据集,或者你安装的是SSD盘,可以考虑采用MEMORY_AND_DISK
- 副本机制的作用相较于容错其实更偏向于效率,因为在spark中丢失的数据可以重算,且数据源一般都有副本机制(如HDFS),那么增加一个副本的理由可能就是避免重算,提高效率
存储细节
RDD的persist只有在触发一个action操作(比如count)的时候才会真正实施,然后通过一系列操作,最后会在Task中调用RDD的iterator()方法来执行计算,以下是iterator方法的代码:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) { // 如果存储级别不是NONE,就从cacheManager中获取数据
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else { // 否则就读取checkpoint,再否则就重新计算
computeOrReadCheckpoint(split, context)
}
}
如果存储级别不为NONE,那么会调用CacheManager的getOrCompute方法,如果有缓存则读取,如果没有则计算并按照存储级别将数据写入缓存,CacheManager相当于BlockManager的包装类,用来管理缓存内容,继续看CacheManager的getOrCompute方法:
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, partition.index) //获取blockid
logDebug(s"Looking for partition $key")
blockManager.get(key) match { //向BlockManager查询是否有缓存
case Some(blockResult) => //缓存命中
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod) //更新统计信息
existingMetrics.incBytesRead(blockResult.bytes)
//将缓存作为结果返回
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None => //没有命中缓存,需要计算
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key) //申请一个锁来加载这个分区的数据
if (storedValues.isDefined) { //如果这部分数据已经被计算过直接返回结果
return new InterruptibleIterator[T](context, storedValues.get)
}
// Otherwise, we have to load the partition ourselves
//如果没有被计算过,我们需要重新计算这部分数据
try {
logInfo(s"Partition $key not found, computing it")
//如果被checkpoint过则读取checkpoint的数据,否则就计算
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
// If the task is running locally, do not persist the result
//如果这个task是在driver端执行的话就直接返回结果
if (context.isRunningLocally) {
return computedValues
}
// Otherwise, cache the values and keep track of any updates in block statuses
//如果是在executor端执行的话就需要更新缓存信息
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] //将计算结果写入BlockManager中
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics //更新任务的统计信息
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
//如果有其他的线程在等待改分区的处理结果,那么通知它们已经计算完成
//结果已经储存到BlockManager中
loading.remove(key)
loading.notifyAll()
}
}
}
}
以上代码中,数据在计算之前会反复确认是否存在缓存中,最后也会调用RDD的computeOrReadCheckpoint方法来计算这部分数据:
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
computeOrReadCheckpoint方法中会递归的调用当前RDD的parentRDD的iterator方法,最后会调用不同RDD类别的compute方法来计算数据:
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
拿到计算数据后会调用putInBlockManager方法将计算结果写入到BlockManager中
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
val putLevel = effectiveStorageLevel.getOrElse(level) //获取存储级别
if (!putLevel.useMemory) { //如果没有使用内存的存储级别,可以直接写入BlockManager
/*
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else { //否则就在内存中展开数据
/*
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back.
*
* In addition, we must be careful to not unroll the entire partition in memory at once.
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {
returnValues
}
}
}
}
代码中,如果获取的存储级别没有memory,那么就调用BlockManager的putIterator方法将计算结果直接写入磁盘,否则就调用BlockManager的putArray方法将计算结果在内存中展开。
def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}
def putBytes(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}
两者最终都会调用doPut方法,只不过一个数据封装为IteratorValues另一个为ArrayValues,一个对应磁盘一个对应内存,还有一个方法也会调用doPut方法,就是BlockManager的putBytes方法,对应的是外部存储。
doPut方法篇幅略长,分为以下三个部分来说明,
- 其中分类缓存数据的部分代码如下所示:
try {
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
// Actually put the values
val result: PutResult = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
}
这里存入的数据结构为memoryStore、diskStore、externalBlockStore分别对应着存储级别中的内存、磁盘和外部存储,他们缓存数据的逻辑后面单独说明,这里分别调用了它们的putArray、putIterator、putBytes方法。
其中需要注意的是,如果存储级别是MEMORY_AND_DISK,代码中体现了优先存储在内存memoryStore中,等到内存满了才会写到diskStore中。
2.其次是副本逻辑,代码体现如下:
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}
这里启动一个Future的线程优先去处理ByteBufferValues类的数据(也就是外部存储类的数据)的复制,其中的核心方法是replicate方法,感兴趣的话可以深入了解一下。
那么IteratorValues类型的数据和ArrayValues类型的副本逻辑怎么处理呢?请看以下代码:
if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
如果是ByteBufferValues类型的数据,那么会对应上面的代码,去等待那个Future线程的回传值。
如果是另外两种类型,则首先进行序列化,然后调用replicate方法去进行复制操作。
- tellMaster将更新上报Master
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
调用reportBlockStatus方法向master汇报更新,最后会向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,而Master会在收到消息后更新Block的元数据
存储Block的类
BlockStore
Block存储的抽象类,定义了接口的一些基本功能和方法:
/**
* Abstract class to store blocks.
*/
private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
// 根据StorageLevel将blockId标识的Block的内容bytes写入系统
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult
/**
* Put in a block and, possibly, also return its content as either bytes or another Iterator.
* This is used to efficiently write the values to multiple locations (e.g. for replication).
*
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
// 将values写入系统,如果returnValues为true,需要将结果写入PutResult
def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
//同上,只不过由Iterator变成Array
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
/**
* Return the size of a block in bytes.
*/
// 获得Block的大小
def getSize(blockId: BlockId): Long
// 获得Block的数据,返回类型ByteBuffer
def getBytes(blockId: BlockId): Option[ByteBuffer]
// 获取Block的数据,返回类型Iterator[Any]
def getValues(blockId: BlockId): Option[Iterator[Any]]
/**
* Remove a block, if it exists.
* @param blockId the block to remove.
* @return True if the block was found and removed, False otherwise.
*/
// 删除Block,成功返回true, 否则返回false
def remove(blockId: BlockId): Boolean
// 查询是否包含某个Block
def contains(blockId: BlockId): Boolean
// 退出时清理回收资源
def clear() { }
}
BlockStore的实现类有三个MemoryStore、DiskStore、ExternalBlockStore,分别对应了存储级别的内存、磁盘和外部存储。
MemoryStore维护了一个数据结构,是一个HashMap
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
所有需要缓存在内存中的数据都是通过tryToPut方法维护到这个数据结构中,如果内存不够的话会释放一些老的缓存,如果存储级别中还有磁盘,就会调用DiskStore的putIterator写入Disk,如果没有,那么就不缓存这部分数据,下次需要就重新计算。
DiskStore将数据持久化到磁盘中,会以什么样的形式存储呢?我们来看DiskStore的putIterator方法:
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
logDebug(s"Attempting to write values for block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId) //获取文件句柄
val outputStream = new FileOutputStream(file) //创建流
try {
Utils.tryWithSafeFinally {
blockManager.dataSerializeStream(blockId, outputStream, values) //序列化流
} {
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
}
} catch {
case e: Throwable =>
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
throw e
}
val length = file.length
val timeTaken = System.currentTimeMillis - startTime
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(length), timeTaken))
if (returnValues) {
// Return a byte buffer for the contents of the file
val buffer = getBytes(blockId).get
PutResult(length, Right(buffer))
} else {
PutResult(length, null)
}
以上代码中创建了一个文件流,序列化之后就写入了本地的物理文件,获取文件句柄的方法为getFile,接着看其实现:
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename) //根据文件名hash值获取文件应该存放的层级位置
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
new File(subDir, filename) //创建文件句柄
}
根据BlockId的name的hash值取得文件的存放路径,然后创建一个文件句柄将数据写入物理文件,而这个物理文件的路径可以通过spark.local.dir来进行配置,在yarn-cluster模式下,这个路径会被yarn.nodemanager.local-dirs替换。
Storage模块调优
- 首先就是本篇文章开始时说的选择存储级别的注意事项,尽量使用内存,少用磁盘,序列化和副本根据情况选择使用。
- spark.local.dir:
磁盘存储级别物理文件的路径设置项,尽量配置多个路径(用逗号隔开),如有条件最好选择SSD盘。 - spark.memory.storageFraction:
可用内存中,用于Storage模块缓存数据的占比,默认为0.5,也就是和Shuffle模块占用内存五五开,但是1.5版本之后,spark有一个动态内存分配模型的功能,简单来说就是在使用内存的时候,Shuffle是亲儿子,它可以占用分给Storage的内存拒不归还,而Storage却不行。
这里调优的策略就是,根据实际情况,如果程序RDD的缓存数据集量较大,而期间很少产生shuffle数据的话,可以适当把这个参数提高。 - 堆外内存(off-head memory)
堆外内存也是一种外部存储,是spark通过调用java的unsafe相关API直接向操作系统要内存,这种方式的优点是跳过JVM的管理可以避免GC影响,缺点是需要自己来编写内存申请和释放的逻辑
spark.memory.offHeap.enabled
默认为false,设置为true打开堆外内存功能
spark.memory.offHeap.size
默认为0,,打开堆外内存功能后,方可设置内存大小,但在配置的时候需要小心内存溢出的问题
总结
关于Storage模块的源码阅读就分析到这儿,阅读Storage模块的源码有助于了解RDD之下,系统又做了哪些操作,RDD实现了逻辑,而Storage管理着数据。通过阅读源码,对于今后的问题定位和性能调优提供了理论依据。