8.5 Shuffle 过程之 IndexShuffleBloc

2018-12-25  本文已影响0人  GongMeng

1. 概述

我们前面可以看到在执行SortShuffleManager的过程中, 会初始化IndexShuffleBlockResolver 作为各种ShuffleWriter必须的初始化组件.
IndexShuffleBlockResolver就像名字描述的一样, 主要用于shuffle blocks, 从逻辑block到物理文件之间的映射关系. 它会确保每个Map过程最终生成的block(也就是blockManager维护的那些block) 会被按照key sort后放在同一个文件里, 然后另外生成一份index file告诉使用者文件的每一段都是哪个key对应的block

2. 主要的方法

2.1 writeIndexFileAndCommit

如下图所示, 写一个index文件, 里面是标记数据文件中每个block起始点的offset, 这些数据后边会被getBlockData 调用来寻找每个block的开始和结束位置
这个方法还需要保证, data和index文件的写是atomic的, 保证两个文件的一致性

  def writeIndexFileAndCommit(
      shuffleId: Int,
      mapId: Int,
      lengths: Array[Long],
      dataTmp: File): Unit = {

    // 为了保证原子性, 使用了传统的先写到一个临时文件, check所有都一致, 然后重命名的策略. 如果中间发生失败就抛弃临时文件即可
    val indexFile = getIndexFile(shuffleId, mapId)
    val indexTmp = Utils.tempFileWith(indexFile)
    try {
      val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
      Utils.tryWithSafeFinally {
        // 每个block的大小转换成offset
        var offset = 0L
        out.writeLong(offset)
        for (length <- lengths) {
          offset += length
          out.writeLong(offset)
        }
      } {
        out.close()
      }

      val dataFile = getDataFile(shuffleId, mapId)
      // 下面这段同步代码保证了每个executor同时只会执行一个IndexShuffleBlockResolver, 后续的检测和重命名过程是源自的
      synchronized {
        val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
        if (existingLengths != null) {
          // 如果相关的index已经存在, 就可以直接退出了, 这是因为这个mapTask可能已经运行过了. 
          // 当然也可能因为其它原因失败, 但总之这次写是不成功的, 直接删除tmp文件完事
          System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
          if (dataTmp != null && dataTmp.exists()) {
            dataTmp.delete()
          }
          indexTmp.delete()
        } else {
          // 写成功的话, 删除旧的data文件和index文件, 用心的覆盖
          if (indexFile.exists()) {
            indexFile.delete()
          }
          if (dataFile.exists()) {
            dataFile.delete()
          }
          if (!indexTmp.renameTo(indexFile)) {
            throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
          }
          if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
            throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
          }
        }
      }
    } finally {
      if (indexTmp.exists() && !indexTmp.delete()) {
        logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
      }
    }
  }

2.2 getBlockData

用于根据index文件从data文件中把需要的block提取出来, 好等待后一步进入shuffle过程

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    // 因为设计上是每个shuffle过程生成一个file, 这里根据shuffleId获取到它的index文件
    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
    // 读取文件的过程
    val in = new DataInputStream(new FileInputStream(indexFile))
    try {
      ByteStreams.skipFully(in, blockId.reduceId * 8)
      val offset = in.readLong()
      val nextOffset = in.readLong()
      new FileSegmentManagedBuffer(
        transportConf,
        getDataFile(blockId.shuffleId, blockId.mapId),
        offset,
        nextOffset - offset)
    } finally {
      in.close()
    }
  }
上一篇 下一篇

猜你喜欢

热点阅读