Spark源码精读分析计划

Spark Core源码精读计划#30:终于讲到的BlockMa

2019-08-22  本文已影响0人  LittleMagic

目录

前言

如题,在前方做了很多铺垫之后,本文终于可以来看BlockManager了,可谓是千呼万唤始出来。

块管理器BlockManager会运行在Spark集群中的所有节点上。每个节点上的BlockManager通过MemoryManager、MemoryStore、DiskBlockManager、DiskStore来管理其内存、磁盘中的块,并与其他节点进行块的交互,是一个规模庞大的组件。为了避免写太多出不来,本文先聚焦在两个最基础的方面,即BlockManager的初始化与块的读取流程。写入流程和其他逻辑(比如BlockTransferService)会另开坑来讲解。

BlockManager的初始化

构造方法与属性成员

代码#30.1 - o.a.s.storage.BlockManager类的构造方法与属性成员

private[spark] class BlockManager(
    executorId: String,
    rpcEnv: RpcEnv,
    val master: BlockManagerMaster,
    val serializerManager: SerializerManager,
    val conf: SparkConf,
    memoryManager: MemoryManager,
    mapOutputTracker: MapOutputTracker,
    shuffleManager: ShuffleManager,
    val blockTransferService: BlockTransferService,
    securityManager: SecurityManager,
    numUsableCores: Int)
  extends BlockDataManager with BlockEvictionHandler with Logging {

  private[spark] val externalShuffleServiceEnabled =
    conf.getBoolean("spark.shuffle.service.enabled", false)

  val diskBlockManager = {
    val deleteFilesOnStop =
      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
    new DiskBlockManager(conf, deleteFilesOnStop)
  }

  private[storage] val blockInfoManager = new BlockInfoManager

  private val futureExecutionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))

  private[spark] val memoryStore =
    new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
  private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
  memoryManager.setMemoryStore(memoryStore)

  private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
  private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory

  private val externalShuffleServicePort = {
    val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
    if (tmpPort == 0) {
      conf.get("spark.shuffle.service.port").toInt
    } else {
      tmpPort
    }
  }

  var blockManagerId: BlockManagerId = _
  private[spark] var shuffleServerId: BlockManagerId = _

  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
    new ExternalShuffleClient(transConf, securityManager,
      securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
  } else {
    blockTransferService
  }

  private val maxFailuresBeforeLocationRefresh =
    conf.getInt("spark.block.failures.beforeLocationRefresh", 5)

  private val slaveEndpoint = rpcEnv.setupEndpoint(
    "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))

  private var asyncReregisterTask: Future[Unit] = null
  private val asyncReregisterLock = new Object

  @volatile private var cachedPeers: Seq[BlockManagerId] = _
  private val peerFetchLock = new Object
  private var lastPeerFetchTime = 0L

  private var blockReplicationPolicy: BlockReplicationPolicy = _

  private[storage] val remoteBlockTempFileManager =
    new BlockManager.RemoteBlockDownloadFileManager(this)
  private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
  // ......
}

BlockManager接受很多构造方法参数,之前已经讲过的类型就不再多说,其中还另外包含三个没有详细讲过的组件:MapOutputTracker,用于跟踪任务执行时Map任务的输出(即Reduce任务的输入),属于调度模块的一部分;ShuffleManager,用于管理Shuffle策略,在本专题之外的文章里详细分析过;BlockTransferService,顾名思义用来在各个节点之间远程传输块,这个在后面的文章中马上就会讲到。

BlockManager实现了BlockDataManager和BlockEvictionHandler两个特征,分别表示BlockManager可以管理块数据,以及从内存中淘汰块。截止目前,BlockManager是这两个特征的唯一的实现类。

下面来看看BlockManager类中的属性成员。看官已经很熟悉的组件(如MemoryStore、DiskStore等)也就不再赘述,只说几个主要的新面孔。

初始化方法

SparkEnv中调用了BlockManager的initialize()方法来初始化它,代码如下。

代码#30.2 - o.a.s.storage.BlockManager.initialize()方法

  def initialize(appId: String): Unit = {
    blockTransferService.init(this)
    shuffleClient.init(appId)

    blockReplicationPolicy = {
      val priorityClass = conf.get(
        "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
      val clazz = Utils.classForName(priorityClass)
      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
      logInfo(s"Using $priorityClass for block replication policy")
      ret
    }

    val id =
      BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
    val idFromMaster = master.registerBlockManager(
      id,
      maxOnHeapMemory,
      maxOffHeapMemory,
      slaveEndpoint)

    blockManagerId = if (idFromMaster != null) idFromMaster else id
    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }

    logInfo(s"Initialized BlockManager: $blockManagerId")
  }

BlockManager初始化的流程如下:

  1. 初始化BlockTransferService和ShuffleClient。
  2. 根据配置项spark.storage.replication.policy确定块复制策略并通过反射创建。默认值为RandomBlockReplicationPolicy,说明是将块的副本随机放到不同的节点上。
  3. 根据Executor ID生成BlockManagerId,并调用BlockManagerMaster.registerBlockManager()方法注册此ID与从RPC端点。注册成功后,BlockManagerMaster会返回另一个正式的ID。
  4. 生成Shuffle服务的ID。如果当前节点是Executor并启用了外部Shuffle服务的话,就调用registerWithExternalShuffleServer()方法注册外部Shuffle服务,代码略去。

前面写了这么多,可能看官还是没有实感(其实窝自己也是)。那么接下来看块读取流程,这是BlockManager的主要任务之一,并且没那么虚。

块读写的入口

在BlockManager中提供了多种对块进行读写的方法,其中一个将读写进行统一的入口是getOrElseUpdate()方法。因为块可以由RDD物化而来,因此我们可以方便地在RDD类中(具体来说是RDD.getOrCompute()方法)找到对它的调用。为了方便分析,本文就由它来入手。先顺便看一下源码吧。

代码#30.3 - o.a.s.storage.BlockManager.getOrElseUpdate()方法

  def getOrElseUpdate[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[T],
      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    get[T](blockId)(classTag) match {
      case Some(block) =>
        return Left(block)
      case _ =>
    }
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
      case None =>
        val blockResult = getLocalValues(blockId).getOrElse {
          releaseLock(blockId)
          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
        }
        releaseLock(blockId)
        Left(blockResult)
      case Some(iter) =>
       Right(iter)
    }
  }

该方法会首先根据块ID尝试读取数据(先从本地,后从远端)。如果获取不到,就调用传入的makeIterator函数将数据转化为迭代器并写入之。最终将读取或写入的数据封装在BlockResult结构中返回。

块读取流程

以下就是代码#30.3中调用的get()方法。

代码#30.4 - o.a.s.storage.BlockManager.get()方法

  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
    val local = getLocalValues(blockId)
    if (local.isDefined) {
      logInfo(s"Found block $blockId locally")
      return local
    }
    val remote = getRemoteValues[T](blockId)
    if (remote.isDefined) {
      logInfo(s"Found block $blockId remotely")
      return remote
    }
    None
  }

该方法先调用getLocalValues()方法从本地(注意是本地Executor)读取数据,如果读取不到,就继续调用getRemoteValues()方法从远端获取数据。下面分别来看。

从本地读取数据

代码#30.5 - o.a.s.storage.BlockManager.getLocalValues()方法

  def getLocalValues(blockId: BlockId): Option[BlockResult] = {
    logDebug(s"Getting local block $blockId")
    blockInfoManager.lockForReading(blockId) match {
      case None =>
        logDebug(s"Block $blockId was not found")
        None
      case Some(info) =>
        val level = info.level
        logDebug(s"Level for block $blockId is $level")
        val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
        if (level.useMemory && memoryStore.contains(blockId)) {
          val iter: Iterator[Any] = if (level.deserialized) {
            memoryStore.getValues(blockId).get
          } else {
            serializerManager.dataDeserializeStream(
              blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
          }

          val ci = CompletionIterator[Any, Iterator[Any]](iter, {
            releaseLock(blockId, taskAttemptId)
          })
          Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
        } else if (level.useDisk && diskStore.contains(blockId)) {
          val diskData = diskStore.getBytes(blockId)
          val iterToReturn: Iterator[Any] = {
            if (level.deserialized) {
              val diskValues = serializerManager.dataDeserializeStream(
                blockId,
                diskData.toInputStream())(info.classTag)
              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
            } else {
              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
                .map { _.toInputStream(dispose = false) }
                .getOrElse { diskData.toInputStream() }
              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
            }
          }
          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
            releaseLockAndDispose(blockId, diskData, taskAttemptId)
          })
          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
        } else {
          handleLocalReadFailure(blockId)
        }
    }
  }

有点长,但是思路比较清晰,以下简述这个方法的执行流程:

  1. 调用BlockInfoManager.lockForReading()方法,为这个块加读锁,并试图返回对应的块元数据BlockInfo。
  2. 如果没有BlockInfo,说明该块在本地不存在。反之,检查它的StorageLevel,按优先内存、其次磁盘的顺序考虑。
  3. 若该块的StorageLevel显示会利用内存,并且数据在MemoryStore中,就根据该数据是否序列化的情况,调用MemoryStore.getValues()或getBytes()方法,最终获得块数据的迭代器表示。
  4. 若该块的StorageLevel显示会利用磁盘,并且数据在DiskStore中,就先用DiskStore.getBytes()方法获得磁盘中块数据的字节流,然后根据是否序列化做不同的处理。其中还会用到maybeCacheDiskValuesInMemory()/maybeCacheDiskBytesInMemory()试图将读取到的磁盘数据cache到内存,以加快速度。
  5. 调用releaseLock()或releaseLockAndDispose()方法,释放块的读锁。
  6. 将块数据的迭代器、读取方法和块的字节数封装在BlockResult结构中返回。如果从内存读取和从磁盘读取都失败,就调用handleLocalReadFailure()方法处理本地读取的错误。

希望说的还算明白哈。继续看从远端读取块数据的方法。

从远端读取数据

代码#30.6 - o.a.s.storage.BlockManager.getRemoteValues()方法

  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
    val ct = implicitly[ClassTag[T]]
    getRemoteBytes(blockId).map { data =>
      val values =
        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
      new BlockResult(values, DataReadMethod.Network, data.size)
    }
  }

这个方法很短,是因为主要逻辑都在getRemoteBytes()方法中实现了。这是很显然的,因为远端的块数据必须要序列化之后才能传输,来到本地之后再反序列化为对象,所以实际上获取的是字节流。以下则是getRemoteBytes()方法的源码。

代码#30.7 - o.a.s.storage.BlockManager.getRemoteBytes()方法

  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
    logDebug(s"Getting remote block $blockId")
    require(blockId != null, "BlockId is null")
    var runningFailureCount = 0
    var totalFailureCount = 0

    val locationsAndStatus = master.getLocationsAndStatus(blockId)
    val blockSize = locationsAndStatus.map { b =>
      b.status.diskSize.max(b.status.memSize)
    }.getOrElse(0L)
    val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)

    val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
      remoteBlockTempFileManager
    } else {
      null
    }

    val locations = sortLocations(blockLocations)
    val maxFetchFailures = locations.size
    var locationIterator = locations.iterator
    while (locationIterator.hasNext) {
      val loc = locationIterator.next()
      logDebug(s"Getting remote block $blockId from $loc")
      val data = try {
        blockTransferService.fetchBlockSync(
          loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()
      } catch {
        case NonFatal(e) =>
          runningFailureCount += 1
          totalFailureCount += 1

          if (totalFailureCount >= maxFetchFailures) {
            logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
              s"Most recent failure cause:", e)
            return None
          }

          logWarning(s"Failed to fetch remote block $blockId " +
            s"from $loc (failed attempt $runningFailureCount)", e)

          if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
            locationIterator = sortLocations(master.getLocations(blockId)).iterator
            logDebug(s"Refreshed locations from the driver " +
              s"after ${runningFailureCount} fetch failures.")
            runningFailureCount = 0
          }

          null
      }

      if (data != null) {
        return Some(new ChunkedByteBuffer(data))
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }

该方法的执行流程如下:

  1. 调用BlockManagerMaster.getLocationsAndStatus()方法,获取所有持有该块数据的远端BlockManager位置。
  2. 调用sortLocations()方法,根据BlockManagerId中的拓扑信息对BlockManager的位置进行排序。处于同一台服务器上的BlockManager排在最前,然后是同一机架上的节点的BlockManager(前提是能够感知到机架),最后才是不同机架的节点上的BlockManager。
  3. 对于每个远端BlockManager,调用BlockTransferService.fetchBlockSync()方法,同步地获取块数据,并以ChunkedByteBuffer形式返回。
  4. 如果从某个远端BlockManager获取不到块数据,就继续尝试下一个。当失败的尝试次数达到spark.block.failures.beforeLocationRefresh参数规定的阈值(默认值5)时,就主动刷新一次远端BlockManager的位置,防止过期。
  5. 若已经尝试了所有的远端BlockManager仍然未获取到,就认为此次读取失败。

总结

本文详细叙述了BlockManager的初始化过程,以及从本地、远端读取块数据的过程。下两篇文章会将写入块与BlockTransferService的相关细节补齐,这样我们就可以整理出BlockManager读写流程的全貌了。

晚安。

上一篇下一篇

猜你喜欢

热点阅读