Spark源码精读分析计划

Spark Core源码精读计划#28:磁盘存储DiskStor

2019-08-06  本文已影响9人  LittleMagic

目录

前言

在上一篇文章中,我们认识了Spark管理磁盘块的组件DiskBlockManager,本文接着来看真正负责磁盘存储的组件DiskStore,以及与它相关的BlockData。这部分内容会涉及到一点与Java NIO相关的东西,看官需要稍微注意一下。

磁盘存储DiskStore

构造方法与属性成员

代码#28.1 - o.a.s.storage.DiskStore类的构造方法与属性成员

private[spark] class DiskStore(
    conf: SparkConf,
    diskManager: DiskBlockManager,
    securityManager: SecurityManager) extends Logging {

  private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
  private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
    Int.MaxValue.toString)
  private val blockSizes = new ConcurrentHashMap[BlockId, Long]()

  // ......
}

DiskStore接受3个构造方法参数,分别是SparkConf、DiskBlockManager和SecurityManager的实例,其中SecurityManager用于提供对数据加密的支持。3个属性字段的含义如下:

写入块

写入块的逻辑由put()方法来实现。

代码#28.2 - o.a.s.storage.DiskStore.put()/contains()方法

  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
    if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
    }
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val out = new CountingWritableChannel(openForWrite(file))
    var threwException: Boolean = true
    try {
      writeFunc(out)
      blockSizes.put(blockId, out.getCount)
      threwException = false
    } finally {
      try {
        out.close()
      } catch {
        case ioe: IOException =>
          if (!threwException) {
            threwException = true
            throw ioe
          }
      } finally {
         if (threwException) {
          remove(blockId)
        }
      }
    }
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
  }

  def contains(blockId: BlockId): Boolean = {
    val file = diskManager.getFile(blockId.name)
    file.exists()
  }

put()方法首先调用contains()方法检查块是否已经以文件的形式写入了,只有没有写入才会继续操作。然后,调用DiskBlockManager.getFile()方法打开块ID对应的文件,然后获取该文件的WritableByteChannel(NIO中的写通道,表示可以通过调用write()方法向文件写入数据)。最后,调用参数中传入的writeFunc函数,操作WritableByteChannel将数据写入,并将块ID与其对应的字节数加入blockSizes映射。

接下来看一看代码#28.2中调用的openForWrite()方法。

代码#28.3 - o.a.s.storage.DiskStore.openForWrite()方法

  private def openForWrite(file: File): WritableByteChannel = {
    val out = new FileOutputStream(file).getChannel()
    try {
      securityManager.getIOEncryptionKey().map { key =>
        CryptoStreamUtils.createWritableChannel(out, conf, key)
      }.getOrElse(out)
    } catch {
      case e: Exception =>
        Closeables.close(out, true)
        file.delete()
        throw e
    }
  }

可见,该方法就是通过文件对象构造了文件输出流FileOutputStream,然后获取它对应的Channel对象用于写数据。特别地,如果I/O需要加密,就需要另外调用CryptoStreamUtils.createWritableChannel()方法包装,本文就不涉及了。至于CountingWritableChannel,也只是基于WritableByteChannel接口扩展出来的一个简单类,增加了统计字节数的方法,代码也就不再列出。

写入字节

代码#28.4 - o.a.s.storage.DiskStore.putBytes()方法

  def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    put(blockId) { channel =>
      bytes.writeFully(channel)
    }
  }

可见,该方法除了块ID外,还需要传入封装在ChunkedByteBuffer中的数据。调用上述put()方法时,传入的writeFunc函数调用了ChunkedByteBuffer.writeFully()方法,负责将数据以一定的Chunk大小分块写入WritableByteChannel。

读取字节

代码#28.5 - o.a.s.storage.DiskStore.getBytes()方法

  def getBytes(blockId: BlockId): BlockData = {
    val file = diskManager.getFile(blockId.name)
    val blockSize = getSize(blockId)

    securityManager.getIOEncryptionKey() match {
      case Some(key) =>
        new EncryptedBlockData(file, blockSize, conf, key)
      case _ =>
        new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
    }
  }

这段代码很简单,但可以注意到,在加密环境下和非加密环境下返回的结果是不同的,前者是EncryptedBlockData对象,后者是DiskBlockData对象,而它们都是BlockData的子类。顾名思义,BlockData就是对磁盘块数据的具体封装,下面选择最常见的DiskBlockData来看一看。

磁盘块数据DiskBlockData

这个类是定义在DiskStore下方的私有类,比较短,因此直接全贴在下面。

代码#28.6 - o.a.s.storage.DiskBlockData类

private class DiskBlockData(
    minMemoryMapBytes: Long,
    maxMemoryMapBytes: Long,
    file: File,
    blockSize: Long) extends BlockData {
  override def toInputStream(): InputStream = new FileInputStream(file)

  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)

  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
    Utils.tryWithResource(open()) { channel =>
      var remaining = blockSize
      val chunks = new ListBuffer[ByteBuffer]()
      while (remaining > 0) {
        val chunkSize = math.min(remaining, maxMemoryMapBytes)
        val chunk = allocator(chunkSize.toInt)
        remaining -= chunkSize
        JavaUtils.readFully(channel, chunk)
        chunk.flip()
        chunks += chunk
      }
      new ChunkedByteBuffer(chunks.toArray)
    }
  }

  override def toByteBuffer(): ByteBuffer = {
    require(blockSize < maxMemoryMapBytes,
      s"can't create a byte buffer of size $blockSize" +
      s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
    Utils.tryWithResource(open()) { channel =>
      if (blockSize < minMemoryMapBytes) {
        val buf = ByteBuffer.allocate(blockSize.toInt)
        JavaUtils.readFully(channel, buf)
        buf.flip()
        buf
      } else {
        channel.map(MapMode.READ_ONLY, 0, file.length)
      }
    }
  }

  override def size: Long = blockSize

  override def dispose(): Unit = {}

  private def open() = new FileInputStream(file).getChannel
}

很久之前也已经大概说过,BlockData特征只是定义了块数据的转化方式,具体的细节则留给各个实现类。我们具体看看toChunkedByteBuffer()和toByteBuffer()这两个方法。

转化为ChunkedByteBuffer

Utils.tryWithResource()方法实际上就是Java中try-with-resources的Scala实现,因为Scala中并没有这个语法糖。

toChunkedByteBuffer()方法会将文件转化为输入流FileInputStream,并获取其ReadableFileChannel,再调用JavaUtils.readFully()方法将从Channel中取得的数据填充到ByteBuffer中。每个ByteBuffer即为一个Chunk,所有Chunk的数组形成最终的ChunkedByteBuffer。关于ChunkedByteBuffer在文章#21简要提到过,之后会很快写一篇番外文章专门讲解它,因为有点意思。

转化为ByteBuffer

toByteBuffer()方法会检查块大小是否小于spark.storage.memoryMapThreshold(终于出现了)。如果小于的话,就会采用与toChunkedByteBuffer()相同的方式直接填充ByteBuffer。反之,就调用ReadableFileChannel.map()方法将数据映射到MappedByteBuffer中,即进程的虚拟内存中。不过,考虑到内存映射的应用场景的话,2MB的阈值可能有点小(保守)了,一点碎碎念,请勿在意。

总结

本文研究了Spark磁盘存储类DiskStore的具体实现,主要是写入块/字节以及读取字节的方法。另外,DiskStore读取的字节会用BlockData来封装,因此也顺便了解了一下DiskBlockData的一点细节。

上一篇下一篇

猜你喜欢

热点阅读