Spark Core源码精读计划#26:内存存储MemorySt
目录
前言
差点把这个系列忘了,忙里偷闲接着写。
前面我们已经对内存池MemoryPool、内存管理器MemoryManager有了比较深入的了解,接下来要介绍的就是MemoryStore,它负责Spark内存存储的具体事项,将内存管理机制与存储块联系起来。本文先介绍与MemoryStore相关的MemoryEntry,然后详细分析MemoryStore的主要源码。
MemoryEntry
顾名思义,MemoryEntry就是内存中的一个“项”,或者说是块在内存中的抽象表示。它由一个特征定义。
代码#26.1 - o.a.s.memory.MemoryEntry特征
private sealed trait MemoryEntry[T] {
def size: Long
def memoryMode: MemoryMode
def classTag: ClassTag[T]
}
其中,size表示该MemoryEntry代表的块大小,memoryMode表示块存储在堆内内存还是堆外内存,classTag则是该块所存储的对象的类型标记。MemoryEntry有序列化和反序列化的两种实现,如下所示。
代码#26.2 - o.a.s.memory.SerializedMemoryEntry/DeserializedMemoryEntry类
private case class DeserializedMemoryEntry[T](
value: Array[T],
size: Long,
classTag: ClassTag[T]) extends MemoryEntry[T] {
val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}
private case class SerializedMemoryEntry[T](
buffer: ChunkedByteBuffer,
memoryMode: MemoryMode,
classTag: ClassTag[T]) extends MemoryEntry[T] {
def size: Long = buffer.size
}
可见,反序列化的DeserializedMemoryEntry只能用堆内内存存储,其数据是T类型的对象的数组。序列化的SerializedMemoryEntry能用堆内和堆外内存存储,数据用之前讲过的字节缓存ChunkedByteBuffer包装,并且其长度就是该SerializedMemoryEntry的大小。
MemoryStore
MemoryStore的内容比较多,仍然分块来看。
构造与属性成员
代码#26.3 - o.a.s.memory.MemoryStore类的构造与属性成员
private[spark] class MemoryStore(
conf: SparkConf,
blockInfoManager: BlockInfoManager,
serializerManager: SerializerManager,
memoryManager: MemoryManager,
blockEvictionHandler: BlockEvictionHandler)
extends Logging {
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
private val unrollMemoryThreshold: Long =
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
private def maxMemory: Long = {
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
}
if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +
s"memory. Please configure Spark with more memory.")
}
logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
private def memoryUsed: Long = memoryManager.storageMemoryUsed
def currentUnrollMemory: Long = memoryManager.synchronized {
onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
}
private def blocksMemoryUsed: Long = memoryManager.synchronized {
memoryUsed - currentUnrollMemory
}
可见,MemoryStore需要5个构造方法参数。前4个参数我们已经很熟悉了,不再多说。第5个参数是BlockEvictionHandler类型的,它实际上也是个特征,实现了该特征的类的作用就是将块从内存中淘汰掉。目前只有BlockManager实现了该特征,所以等到讲BlockManager时,再回头看它。
以下是MemoryStore类的属性成员:
- entries:块ID与对应的MemoryEntry的映射关系,用LinkedHashMap结构存储,初始容量为32,负载因子0.75。
- onHeapUnrollMemoryMap/offHeapUnrollMemoryMap:分别存储TaskAttempId与该Task在堆内、堆外内存占用的展开内存大小映射关系。
- unrollMemoryThreshold:在展开块之前申请的初始展开内存大小,由spark.storage.unrollMemoryThreshold配置项来控制,默认1MB。
除此之外,还有四个Getter方法,它们负责返回对应内存的量:
- maxMemory:堆内与堆外存储内存之和。如果内存管理器为StaticMemoryManager,该值为定值;如果内存管理器为UnifiedMemoryManager,该值会浮动。
- memoryUsed:已经使用了的堆内与堆外存储内存之和。
- currentUnrollMemory:当前展开内存占用的大小,由上面的onHeapUnrollMemoryMap/offHeapUnrollMemoryMap统计而来。
- blocksMemoryUsed:当前除展开内存之外的存储内存(即真正存储块的内存)大小,即memoryUsed与currentUnrollMemory之差。
下面我们分别来看向MemoryStore写入以及从MemoryStore读取数据的方法。
直接写入字节
该方法名为putBytes(),代码如下。
代码#26.4 - o.a.s.memory.MemoryStore.putBytes()方法
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
true
} else {
false
}
}
该方法的实现比较简单:首先调用MemoryManager.acquireStorageMemory()方法申请所需的内存,然后调用参数中传入的偏函数_bytes,获取已经转化为ChunkedByteBuffer的数据。再创建出对应的SerializedMemoryEntry,并将该MemoryEntry放入entries映射。注意LinkedHashMap本身不是线程安全的,因此对其并发访问都要加锁。
写入迭代器化的数据
所谓迭代器化的数据,就是指用Iterator[T]形式表示的块数据。之所以会这样表示,是因为有时单个块对应的数据可能过大,不能一次性存入内存。为了避免造成OOM,就可以一边遍历迭代器,一边周期性地写内存,并检查内存是否够用,就像翻书一样。“展开”(Unroll)这个词形象地说明了该过程,其对应的方法是putIteratorAsValues()与putIteratorAsBytes(),分别产生DeserializedMemoryEntry与SerializedMemoryEntry。由于两个方法的逻辑类似,因此我们只以putIteratorAsValues()来讲解。
代码很长,但我还是不“Unroll”了,全部放在下面。
代码#26.5 - o.a.s.memory.MemoryStore.putIteratorAsValues()
private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
var elementsUnrolled = 0
var keepUnrolling = true
val initialMemoryThreshold = unrollMemoryThreshold
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
var memoryThreshold = initialMemoryThreshold
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
var unrollMemoryUsedByThisBlock = 0L
var vector = new SizeTrackingVector[T]()(classTag)
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
while (values.hasNext && keepUnrolling) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) {
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
}
if (keepUnrolling) {
val arrayValues = vector.toArray
vector = null
val entry =
new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
}
val enoughStorageMemory = {
if (unrollMemoryUsedByThisBlock <= size) {
val acquiredExtra =
memoryManager.acquireStorageMemory(
blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
if (acquiredExtra) {
transferUnrollToStorage(unrollMemoryUsedByThisBlock)
}
acquiredExtra
} else {
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
}
if (enoughStorageMemory) {
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
"released too much unroll memory")
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator,
rest = Iterator.empty))
}
} else {
logUnrollFailureMessage(blockId, vector.estimateSize())
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = vector.iterator,
rest = values))
}
}
在具体看逻辑之前,先弄明白两个配置项:
- UNROLL_MEMORY_CHECK_PERIOD,对应参数spark.storage.unrollMemoryCheckPeriod,表示在迭代过程中检查内存是否够用的周期,默认值16,即每16个元素检查一次。
- UNROLL_MEMORY_GROWTH_FACTOR,对应参数spark.storage.unrollMemoryGrowthFactor,表示申请新的展开内存时扩展的倍数,默认值1.5。
然后就可以具体探究该方法的执行流程了:
- 调用reserveUnrollMemoryForThisTask(),申请初始的展开内存,并随时记录该块使用了多少展开内存。
- 循环迭代块的数据,将其放入一个SizeTrackingVector中。该数据结构可以动态估算其中存储的元素的大小,后面会详细分析。
- 每当到了检查的时机,如果已经展开的数据大小超过了当前的展开内存阈值,就再次调用reserveUnrollMemoryForThisTask()方法,试图申请新的展开内存(注意上面的扩展倍数的用法)。申请到之后,同时更新阈值。
- 所有数据都展开之后,标志keepUnrolling为真,表示展开成功。将SizeTrackingVector中的数据封装为DeserializedMemoryEntry。
- 检查申请到的展开内存是否比实际大小还大。如果是,就调用嵌套定义的transferUnrollToStorage()方法(实际又调用了releaseUnrollMemoryForThisTask()方法),释放掉多余的展开内存,并将它们返还给存储内存。
- 一切成功,将块ID与DeserializedMemoryEntry的映射放入entries,并返回Right。注意这个方法返回值的类型是Either类型,它在Scala中表示不相交的两个结果的集合,即可能返回错误的结果(Left),或者正确的结果(Right)。
- 如果没有足够的展开内存,或者展开所有数据后keepUnrolling标志为假,都表示这次写入不成功,返回Left,其中又包含PartiallyUnrolledIterator,表示一个没有完全展开的迭代器。
对于这种又臭又长的单个方法,多读几遍自然就能通顺。下面贴出它调用的申请与释放展开内存的方法,与上面的一大坨相比已经是毛毛雨了,不再赘述了。
代码#26.6 - o.a.s.memory.MemoryStore.reserveUnrollMemoryForThisTask()/releaseUnrollMemoryForThisTask()方法
def reserveUnrollMemoryForThisTask(
blockId: BlockId,
memory: Long,
memoryMode: MemoryMode): Boolean = {
memoryManager.synchronized {
val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
if (success) {
val taskAttemptId = currentTaskAttemptId()
val unrollMemoryMap = memoryMode match {
case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
}
unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
}
success
}
}
def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
val taskAttemptId = currentTaskAttemptId()
memoryManager.synchronized {
val unrollMemoryMap = memoryMode match {
case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
}
if (unrollMemoryMap.contains(taskAttemptId)) {
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
unrollMemoryMap(taskAttemptId) -= memoryToRelease
memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
}
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
}
}
}
读取字节与迭代器化的数据
前者对应的是SerializedMemoryEntry,由getBytes()方法实现。后者对应的是DeserializedMemoryEntry,由getValues()方法实现。
代码#26.7 - o.a.s.memory.MemoryStore.getBytes()/getValues()方法
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
}
}
def getValues(blockId: BlockId): Option[Iterator[_]] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
case e: SerializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getValues on deserialized blocks")
case DeserializedMemoryEntry(values, _, _) =>
val x = Some(values)
x.map(_.iterator)
}
}
淘汰缓存块
该方法名为evictBlocksToFreeSpace(),用途为淘汰现有的一些块,以为新的块腾出空间。它在StorageMemoryPool.acquireMemory()方法(代码#23.4)中调用,如果忘记了的话,可以返回去看看。这个方法的代码也比较长,但稍微容易理解一些。
代码#26.8 - o.a.s.memory.MemoryStore.evictBlocksToFreeSpace()方法
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
memoryMode: MemoryMode): Long = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
val entry = pair.getValue
if (blockIsEvictable(blockId, entry)) {
if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}
}
def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
val data = entry match {
case DeserializedMemoryEntry(values, _, _) => Left(values)
case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
}
val newEffectiveStorageLevel =
blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
if (newEffectiveStorageLevel.isValid) {
blockInfoManager.unlock(blockId)
} else {
blockInfoManager.removeBlock(blockId)
}
}
if (freedMemory >= space) {
var lastSuccessfulBlock = -1
try {
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
s"(${Utils.bytesToString(freedMemory)} bytes)")
(0 until selectedBlocks.size).foreach { idx =>
val blockId = selectedBlocks(idx)
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry != null) {
dropBlock(blockId, entry)
afterDropAction(blockId)
}
lastSuccessfulBlock = idx
}
logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
freedMemory
} finally {
if (lastSuccessfulBlock != selectedBlocks.size - 1) {
(lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
val blockId = selectedBlocks(idx)
blockInfoManager.unlock(blockId)
}
}
}
} else {
blockId.foreach { id =>
logInfo(s"Will not store $id")
}
selectedBlocks.foreach { id =>
blockInfoManager.unlock(id)
}
0L
}
}
}
方法参数中的space就表示需要腾出多大的空间。其执行流程如下:
- 循环遍历entries映射中的块,找出其中能够被淘汰的块。所谓能够被淘汰,是指MemoryMode相同(即堆内对堆内,堆外对堆外,不能交叉),并且块ID对应的块数据不属于RDD。
- 为这些块加写锁,保证当前正在被读取的块不会被淘汰掉。记录将要被淘汰的块ID。
- 如果腾出的空间已经达到了目标值,就调用嵌套定义的dropBlock()方法真正地移除这些块,最终仍然调用了BlockManager.dropFromMemory()方法。该方法会产生两种结果:一是块仍然存在,只是StorageLevel发生变化(比如转存到了磁盘),就只需解开它的写锁;二是块被彻底地移除,就得调用BlockInfoManager.remove()方法删掉它。最后将剩余未处理的块解锁。
- 如果腾出的空间最终仍然不能达到目标值,就不会执行淘汰动作,新的块也不会被存入。
总结
本文首先简要介绍了MemoryEntry的作用,然后详细阅读了MemoryStore的源码,了解了序列化数据和反序列化数据在Spark内存中的读写流程。信息量确实很大,也比较枯燥,但到此为止,我们总算对内存在Spark存储体系中的作用有了较为全面的认识。下一篇文章就会进入磁盘存储的领域。