Spark源码精读分析计划

Spark Core源码精读计划#31:BlockManager

2019-09-30  本文已影响0人  LittleMagic

目录

前言

一个多月没顾上这个专题,惭愧惭愧。十一期间尽量赶上进度吧。

上一篇文章讲完了BlockManager管理下的数据读取流程,今天就来看写入的流程,顺便将读写放在一起总结一下。

块写入流程

块写入的入口

由于距离上一篇已经过去很久了,这里再贴一次BlockManager提供的getOrElseUpdate()方法的源码。该方法提供了块读写的统一入口。

代码#31.1 - o.a.s.storage.BlockManager.getOrElseUpdate()方法

  def getOrElseUpdate[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[T],
      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    get[T](blockId)(classTag) match {
      case Some(block) =>
        return Left(block)
      case _ =>
    }
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
      case None =>
        val blockResult = getLocalValues(blockId).getOrElse {
          releaseLock(blockId)
          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
        }
        releaseLock(blockId)
        Left(blockResult)
      case Some(iter) =>
       Right(iter)
    }
  }

该方法会首先根据块ID尝试读取数据(先从本地,后从远端)。如果获取不到,就调用传入的makeIterator函数将数据转化为迭代器并写入之。最终将读取或写入的数据封装在BlockResult结构中返回。写入数据的方法是doPutIterator(),下面来看它的代码。

doPutIterator()方法

代码#31.2 - o.a.s.storage.BlockManager.doPutIterator()方法

  private def doPutIterator[T](
      blockId: BlockId,
      iterator: () => Iterator[T],
      level: StorageLevel,
      classTag: ClassTag[T],
      tellMaster: Boolean = true,
      keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
      val startTimeMs = System.currentTimeMillis
      var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None

      var size = 0L
      if (level.useMemory) {
        if (level.deserialized) {
          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
            case Right(s) =>
              size = s
            case Left(iter) =>
              if (level.useDisk) {
                logWarning(s"Persisting block $blockId to disk instead.")
                diskStore.put(blockId) { channel =>
                  val out = Channels.newOutputStream(channel)
                  serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
                }
                size = diskStore.getSize(blockId)
              } else {
                iteratorFromFailedMemoryStorePut = Some(iter)
              }
          }
        } else {
          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
            case Right(s) =>
              size = s
            case Left(partiallySerializedValues) =>
              if (level.useDisk) {
                logWarning(s"Persisting block $blockId to disk instead.")
                diskStore.put(blockId) { channel =>
                  val out = Channels.newOutputStream(channel)
                  partiallySerializedValues.finishWritingToStream(out)
                }
                size = diskStore.getSize(blockId)
              } else {
                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
              }
          }
        }

      } else if (level.useDisk) {
        diskStore.put(blockId) { channel =>
          val out = Channels.newOutputStream(channel)
          serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
        }
        size = diskStore.getSize(blockId)
      }

      val putBlockStatus = getCurrentBlockStatus(blockId, info)
      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
      if (blockWasSuccessfullyStored) {
        info.size = size
        if (tellMaster && info.tellMaster) {
          reportBlockStatus(blockId, putBlockStatus)
        }
        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
        logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
        if (level.replication > 1) {
          val remoteStartTime = System.currentTimeMillis
          val bytesToReplicate = doGetLocalBytes(blockId, info)
          val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
            scala.reflect.classTag[Any]
          } else {
            classTag
          }
          try {
            replicate(blockId, bytesToReplicate, level, remoteClassTag)
          } finally {
            bytesToReplicate.dispose()
          }
          logDebug("Put block %s remotely took %s"
            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
        }
      }
      assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
      iteratorFromFailedMemoryStorePut
    }
  }

虽然它本质上调用的是doPut()方法,但是整个写入的主体逻辑是作为一个函数(即doPut()方法的柯里化参数)传入的,因此我们可以先看上面的逻辑。阅读代码,可以整理出如下的执行逻辑:

  1. 检查块的存储等级(即StorageLevel)是否会使用内存,如果会,就优先将块展开到内存中。
  2. 如果存储等级是反序列化的,就调用MemoryStore.putIteratorAsValues()方法(具体逻辑见文章#26)将块数据作为对象写入。反之,如果是序列化的,就调用MemoryStore.putIteratorAsBytes()方法将块数据作为字节流写入。
  3. 检查上一步调用MemoryStore相关方法的结果,是否成功地展开了。如果只展开了一部分,说明内存中无法容纳块数据,因此在存储等级会使用磁盘的情况下,要继续调用DiskStore.put()方法,将多出的数据序列化地溢写到磁盘。若最终仍然没有完全展开,就将剩余的数据记录在iteratorFromFailedMemoryStorePut这个迭代器中(类型为PartiallyUnrolledIterator)。
  4. 若该块的存储等级不允许使用内存,而只允许使用磁盘,就直接调用DiskStore.put()方法写到磁盘中。
  5. 块数据写入完毕之后,如果tellMaster标记为真,调用reportBlockStatus()方法将新块的信息报告给BlockManagerMaster。
  6. 检查块的存储等级是否有副本(即后缀为_2的),如果有,还需要调用replicate()方法将块向其他节点复制一份。
  7. 方法返回iteratorFromFailedMemoryStorePut迭代器。

可见,块写入的过程符合Spark积极使用内存的特征。另外,通过阅读replicate()方法的代码(这里略去),可以发现复制块的过程是阻塞的。这就是在一般情况下不推荐使用带副本的StorageLevel的原因,会造成块写入性能下降,以及造成较大的网络传输开销。

然后就来看看doPut()方法的逻辑。

doPut()方法

该方法是doPutBytes()和doPutIterator()两个方法公用的方法,为写入的逻辑做一些前置和后置工作。代码如下。

代码#31.3 - o.a.s.storage.BlockManager.doPut()方法

  private def doPut[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[_],
      tellMaster: Boolean,
      keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
    require(blockId != null, "BlockId is null")
    require(level != null && level.isValid, "StorageLevel is null or invalid")

    val putBlockInfo = {
      val newInfo = new BlockInfo(level, classTag, tellMaster)
      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
        newInfo
      } else {
        logWarning(s"Block $blockId already exists on this machine; not re-adding it")
        if (!keepReadLock) {
          releaseLock(blockId)
        }
        return None
      }
    }

    val startTimeMs = System.currentTimeMillis
    var exceptionWasThrown: Boolean = true
    val result: Option[T] = try {
      val res = putBody(putBlockInfo)
      exceptionWasThrown = false
      if (res.isEmpty) {
        if (keepReadLock) {
          blockInfoManager.downgradeLock(blockId)
        } else {
          blockInfoManager.unlock(blockId)
        }
      } else {
        removeBlockInternal(blockId, tellMaster = false)
        logWarning(s"Putting block $blockId failed")
      }
      res
    } catch {
      case NonFatal(e) =>
        logWarning(s"Putting block $blockId failed due to exception $e.")
        throw e
    } finally {
      if (exceptionWasThrown) {
        removeBlockInternal(blockId, tellMaster = tellMaster)
        addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
      }
    }
    if (level.replication > 1) {
      logDebug("Putting block %s with replication took %s"
        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
    } else {
      logDebug("Putting block %s without replication took %s"
        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
    }
    result
  }

其中,putBody即为写入块数据的函数,keepReadLock标记表示是否在写入后还继续持有对块的读锁。方法的执行流程如下:

  1. 为块生成新的BlockInfo,并调用BlockInfoManager.lockNewBlockForWriting()加写锁,准备写入。BlockInfoManager的相关细节在文章#22中已经讲过,不再赘述。
  2. 调用putBody函数的逻辑,真正地写入块数据。
  3. 若写入成功,当keepReadLock为真时,就调用BlockInfoManager.downgradeLock()方法将原先持有的写锁降级为读锁,方便后续读取。反之,当keepReadLock为假时,就直接调用BlockInfoManager.unlock()方法直接释放锁。
  4. 若putBody未能写入全部的块数据(返回的迭代器不为空)或者中途抛出了异常,说明写入不成功,调用removeBlockInternal()方法移除失败的块。该方法的实现如下,比较简单。

代码#31.4 - o.a.s.storage.BlockManager.removeBlockInternal()方法

  private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
    val removedFromMemory = memoryStore.remove(blockId)
    val removedFromDisk = diskStore.remove(blockId)
    if (!removedFromMemory && !removedFromDisk) {
      logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
    }
    blockInfoManager.removeBlock(blockId)
    if (tellMaster) {
      reportBlockStatus(blockId, BlockStatus.empty)
    }
  }

总结

为了方便理解,下面用图示出BlockManager读写流程的方法调用链。

图#31.1 - BlockManager的读写流程(不含BlockTransferService)

由上图可见,BlockTransferService充当了本地BlockManager与远程BlockManager之间交互的媒介,要想补全BlockManager读写过程的全貌,还得必须研究BlockTransferService的实现细节才可以。下一篇文章会详细地讲解。

上一篇下一篇

猜你喜欢

热点阅读