Spark源码精读分析计划

Spark Core源码精读计划#27:磁盘块管理器DiskBl

2019-07-29  本文已影响18人  LittleMagic

目录

前言

我们前面用4篇文章的时间讲解了Spark存储子系统中的内存部分,其内容相当多,包括内存池MemoryPool、内存管理器MemoryManager(包含两种实现:静态内存管理器StaticMemoryManager和统一内存管理器UnifiedMemoryManager)、内存项MemoryEntry、内存存储MemoryStore。相对而言,磁盘部分的实现就比较直接而简单一些,主要包含两个组件:磁盘块管理器DiskBlockManager、磁盘存储DiskStore。它们的内容都不是特别复杂,本文就研究一下DiskBlockManager。

磁盘块管理器DiskBlockManager

DiskBlockManager负责维护块数据与其在磁盘上存储位置的关系。先来看看它的构造方法与属性成员。

构造方法与属性成员

代码#27.1 - o.a.s.storage.DiskBlockManager类的构造与属性

private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
  private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)

  private[spark] val localDirs: Array[File] = createLocalDirs(conf)
  if (localDirs.isEmpty) {
    logError("Failed to create any local dir.")
    System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
  }

  private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

  private val shutdownHook = addShutdownHook()
  
  // ......
}

DiskBlockManager接受两个参数:SparkConf实例与一个叫deleteFilesOnStop的布尔值。该值表示DiskBlockManager停止时是否要删除本地的存储目录,由BlockManager初始化它时指定。各个属性成员的含义解释如下:

下面我们就来看看createLocalDirs()方法。

创建本地存储目录

代码#27.2 - o.a.s.storage.DiskBlockManager.createLocalDirs()方法

  private def createLocalDirs(conf: SparkConf): Array[File] = {
    Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
      try {
        val localDir = Utils.createDirectory(rootDir, "blockmgr")
        logInfo(s"Created local directory at $localDir")
        Some(localDir)
      } catch {
        case e: IOException =>
          logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
          None
      }
    }
  }

该方法先调用通用工具类Utils中的getConfiguredLocalDirs()方法获取根目录,然后对每个根目录,调用Utils.createDirectory()方法创建存储目录。也就是说,所有磁盘存储的目录都是组织在一起的。Utils类的代码暂时就不细看了,看官只需知道getConfiguredLocalDirs()会依次检查如下几个环境变量或配置项中的路径即可:

然后,Utils.createDirectory()方法就会创建名称形如blockmgr-[UUID.randomUUID]的一级存储目录,但不会创建子目录。那么哪里会创建子目录呢?答案在getFile()方法中,它除了名称所述的获取文件的功能外,也兼职创建子目录。

获取存储文件及创建子目录

代码#27.3 - o.a.s.storage.DiskBlockManager.getFile()方法

  def getFile(filename: String): File = {
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    val subDir = subDirs(dirId).synchronized {
      val old = subDirs(dirId)(subDirId)
      if (old != null) {
        old
      } else {
        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
        if (!newDir.exists() && !newDir.mkdir()) {
          throw new IOException(s"Failed to create local dir in $newDir.")
        }
        subDirs(dirId)(subDirId) = newDir
        newDir
      }
    }

    new File(subDir, filename)
  }

  def getFile(blockId: BlockId): File = getFile(blockId.name)

该方法的执行流程如下:

  1. 调用Utils.nonNegativeHash()方法,计算出文件名的哈希码的绝对值。
  2. 将哈希码与localDirs数组长度取余,作为目录的下标。再将哈希码与localDirs数组长度的商与subDirsPerLocalDir取余,作为子目录的下标。
  3. 检查文件对应的子目录是否存在。如果不存在的话,就根据子目录的下标来创建,并格式化为两位十六进制表示。
  4. 返回File对象。

另外,getFile()方法还有将BlockId作为输入的重载,由它可见,块对应的文件名与它本身的name字段有关。

通过上面的了解,DiskBlockManager磁盘存储的目录结构可以概括成下图。

图#27.1 - DiskBlockManager的目录结构

除了获取单个文件之外,还有获取所有文件及所有块ID的getAllFiles()与getAllBlocks()方法,它们的实现都很简单,代码如下。
代码#27.4 - o.a.s.storage.DiskBlockManager.getAllFiles()/getAllBlocks()方法

  def getAllFiles(): Seq[File] = {
    subDirs.flatMap { dir =>
      dir.synchronized {
        dir.clone()
      }
    }.filter(_ != null).flatMap { dir =>
      val files = dir.listFiles()
      if (files != null) files else Seq.empty
    }
  }

  def getAllBlocks(): Seq[BlockId] = {
    getAllFiles().flatMap { f =>
      try {
        Some(BlockId(f.getName))
      } catch {
        case _: UnrecognizedBlockId =>
          None
      }
    }
  }

创建临时块文件

代码#27.5 - o.a.s.storage.DiskBlockManager.createTempLocalBlock()/createTempShuffleBlock()方法

  def createTempLocalBlock(): (TempLocalBlockId, File) = {
    var blockId = new TempLocalBlockId(UUID.randomUUID())
    while (getFile(blockId).exists()) {
      blockId = new TempLocalBlockId(UUID.randomUUID())
    }
    (blockId, getFile(blockId))
  }

  def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
    var blockId = new TempShuffleBlockId(UUID.randomUUID())
    while (getFile(blockId).exists()) {
      blockId = new TempShuffleBlockId(UUID.randomUUID())
    }
    (blockId, getFile(blockId))
  }

这两个方法比较简单,就是用来创建Spark计算过程中的中间结果以及Shuffle Write阶段输出的存储文件。它们的块ID分别用TempLocalBlockId和TempShuffleBlockId来表示。

绑定关闭钩子与关闭

代码#27.6 - o.a.s.storage.DiskBlockManager.addShutdownHook()/doStop()方法

  private def addShutdownHook(): AnyRef = {
    logDebug("Adding shutdown hook") // force eager creation of logger
    ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
      logInfo("Shutdown hook called")
      DiskBlockManager.this.doStop()
    }
  }

  private def doStop(): Unit = {
    if (deleteFilesOnStop) {
      localDirs.foreach { localDir =>
        if (localDir.isDirectory() && localDir.exists()) {
          try {
            if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
              Utils.deleteRecursively(localDir)
            }
          } catch {
            case e: Exception =>
              logError(s"Exception while deleting local spark dir: $localDir", e)
          }
        }
      }
    }
  }

由代码可见,如果deleteFilesOnStop标记为真,则在DiskBlockManager关闭之前,会调用Utils.deleteRecursively()方法递归地删掉本地存储目录。

总结

本文介绍了DiskBlockManager的相关设计细节,主要包含其对Spark磁盘存储目录、子目录及文件的创建和管理。至于实际的文件读写,则由磁盘存储DiskStore来负责。DiskStore的实现也比MemoryStore要来得简单,下一篇文章会来探讨它。

上一篇下一篇

猜你喜欢

热点阅读