Spark Core源码精读计划#28:磁盘存储DiskStor
目录
前言
在上一篇文章中,我们认识了Spark管理磁盘块的组件DiskBlockManager,本文接着来看真正负责磁盘存储的组件DiskStore,以及与它相关的BlockData。这部分内容会涉及到一点与Java NIO相关的东西,看官需要稍微注意一下。
磁盘存储DiskStore
构造方法与属性成员
代码#28.1 - o.a.s.storage.DiskStore类的构造方法与属性成员
private[spark] class DiskStore(
conf: SparkConf,
diskManager: DiskBlockManager,
securityManager: SecurityManager) extends Logging {
private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
Int.MaxValue.toString)
private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
// ......
}
DiskStore接受3个构造方法参数,分别是SparkConf、DiskBlockManager和SecurityManager的实例,其中SecurityManager用于提供对数据加密的支持。3个属性字段的含义如下:
- minMemoryMapBytes:使用内存映射(memory map)读取文件的最小阈值,由配置项spark.storage.memoryMapThreshold指定,默认值2M。当磁盘中的文件大小超过该值时,就不会直接读取,而用内存映射文件来读取,提高效率。
- maxMemoryMapBytes:使用内存映射读取文件的最大阈值,由配置项spark.storage.memoryMapLimitForTests指定。它是个测试参数,默认值为不限制。
- blockSizes:维护块ID与其对应大小之间的映射关系的ConcurrentHashMap。
写入块
写入块的逻辑由put()方法来实现。
代码#28.2 - o.a.s.storage.DiskStore.put()/contains()方法
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val out = new CountingWritableChannel(openForWrite(file))
var threwException: Boolean = true
try {
writeFunc(out)
blockSizes.put(blockId, out.getCount)
threwException = false
} finally {
try {
out.close()
} catch {
case ioe: IOException =>
if (!threwException) {
threwException = true
throw ioe
}
} finally {
if (threwException) {
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}
def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()
}
put()方法首先调用contains()方法检查块是否已经以文件的形式写入了,只有没有写入才会继续操作。然后,调用DiskBlockManager.getFile()方法打开块ID对应的文件,然后获取该文件的WritableByteChannel(NIO中的写通道,表示可以通过调用write()方法向文件写入数据)。最后,调用参数中传入的writeFunc函数,操作WritableByteChannel将数据写入,并将块ID与其对应的字节数加入blockSizes映射。
接下来看一看代码#28.2中调用的openForWrite()方法。
代码#28.3 - o.a.s.storage.DiskStore.openForWrite()方法
private def openForWrite(file: File): WritableByteChannel = {
val out = new FileOutputStream(file).getChannel()
try {
securityManager.getIOEncryptionKey().map { key =>
CryptoStreamUtils.createWritableChannel(out, conf, key)
}.getOrElse(out)
} catch {
case e: Exception =>
Closeables.close(out, true)
file.delete()
throw e
}
}
可见,该方法就是通过文件对象构造了文件输出流FileOutputStream,然后获取它对应的Channel对象用于写数据。特别地,如果I/O需要加密,就需要另外调用CryptoStreamUtils.createWritableChannel()方法包装,本文就不涉及了。至于CountingWritableChannel,也只是基于WritableByteChannel接口扩展出来的一个简单类,增加了统计字节数的方法,代码也就不再列出。
写入字节
代码#28.4 - o.a.s.storage.DiskStore.putBytes()方法
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
put(blockId) { channel =>
bytes.writeFully(channel)
}
}
可见,该方法除了块ID外,还需要传入封装在ChunkedByteBuffer中的数据。调用上述put()方法时,传入的writeFunc函数调用了ChunkedByteBuffer.writeFully()方法,负责将数据以一定的Chunk大小分块写入WritableByteChannel。
读取字节
代码#28.5 - o.a.s.storage.DiskStore.getBytes()方法
def getBytes(blockId: BlockId): BlockData = {
val file = diskManager.getFile(blockId.name)
val blockSize = getSize(blockId)
securityManager.getIOEncryptionKey() match {
case Some(key) =>
new EncryptedBlockData(file, blockSize, conf, key)
case _ =>
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
}
}
这段代码很简单,但可以注意到,在加密环境下和非加密环境下返回的结果是不同的,前者是EncryptedBlockData对象,后者是DiskBlockData对象,而它们都是BlockData的子类。顾名思义,BlockData就是对磁盘块数据的具体封装,下面选择最常见的DiskBlockData来看一看。
磁盘块数据DiskBlockData
这个类是定义在DiskStore下方的私有类,比较短,因此直接全贴在下面。
代码#28.6 - o.a.s.storage.DiskBlockData类
private class DiskBlockData(
minMemoryMapBytes: Long,
maxMemoryMapBytes: Long,
file: File,
blockSize: Long) extends BlockData {
override def toInputStream(): InputStream = new FileInputStream(file)
override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
Utils.tryWithResource(open()) { channel =>
var remaining = blockSize
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, maxMemoryMapBytes)
val chunk = allocator(chunkSize.toInt)
remaining -= chunkSize
JavaUtils.readFully(channel, chunk)
chunk.flip()
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
}
}
override def toByteBuffer(): ByteBuffer = {
require(blockSize < maxMemoryMapBytes,
s"can't create a byte buffer of size $blockSize" +
s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
Utils.tryWithResource(open()) { channel =>
if (blockSize < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(blockSize.toInt)
JavaUtils.readFully(channel, buf)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, 0, file.length)
}
}
}
override def size: Long = blockSize
override def dispose(): Unit = {}
private def open() = new FileInputStream(file).getChannel
}
很久之前也已经大概说过,BlockData特征只是定义了块数据的转化方式,具体的细节则留给各个实现类。我们具体看看toChunkedByteBuffer()和toByteBuffer()这两个方法。
转化为ChunkedByteBuffer
Utils.tryWithResource()方法实际上就是Java中try-with-resources的Scala实现,因为Scala中并没有这个语法糖。
toChunkedByteBuffer()方法会将文件转化为输入流FileInputStream,并获取其ReadableFileChannel,再调用JavaUtils.readFully()方法将从Channel中取得的数据填充到ByteBuffer中。每个ByteBuffer即为一个Chunk,所有Chunk的数组形成最终的ChunkedByteBuffer。关于ChunkedByteBuffer在文章#21简要提到过,之后会很快写一篇番外文章专门讲解它,因为有点意思。
转化为ByteBuffer
toByteBuffer()方法会检查块大小是否小于spark.storage.memoryMapThreshold(终于出现了)。如果小于的话,就会采用与toChunkedByteBuffer()相同的方式直接填充ByteBuffer。反之,就调用ReadableFileChannel.map()方法将数据映射到MappedByteBuffer中,即进程的虚拟内存中。不过,考虑到内存映射的应用场景的话,2MB的阈值可能有点小(保守)了,一点碎碎念,请勿在意。
总结
本文研究了Spark磁盘存储类DiskStore的具体实现,主要是写入块/字节以及读取字节的方法。另外,DiskStore读取的字节会用BlockData来封装,因此也顺便了解了一下DiskBlockData的一点细节。