Hadoophadoop大数据生态圈Hadoop系

hdfs读之block读取解析<一>

2019-08-15  本文已影响37人  古语1

一、hdfs读取流程

二、hdfs客户端打开文件流过程

1. 打开文件流FileSystem.open
  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   */
  public FSDataInputStream open(Path f) throws IOException {
    //该方法被DistributedFileSystem.open实现了
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }
2.继承了FileSystem的 DistributedFileSystem.open
  @Override
  public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    statistics.incrementReadOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override
      public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
          //该方法调用DFSClient的open
        final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

3. DFSClient.open如下
  /**
   * Create an input stream that obtains a nodelist from the
   * namenode, and then reads from all the right places.  Creates
   * inner subclass of InputStream that does the right out-of-band
   * work.
   */
  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException, UnresolvedLinkException {
    //检查客户端读取文件是否关闭
    checkOpen();
    //    Get block info from namenode
    TraceScope scope = getPathTraceScope("newDFSInputStream", src);
    try {
      //返回构造方法,下面到这个构造方法看看逻辑
      return new DFSInputStream(this, src, verifyChecksum);
    } finally {
      scope.close();
    }
  }
4. DFSInputStream 构造方法
 DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
                 ) throws IOException, UnresolvedLinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.src = src;
    synchronized (infoLock) {
      /**
       * 读缓存策略,readDropBehind和readahead两个参数控制读缓存策略,数据读取通常为磁盘操作,每次read将会读取一页数据(512b或者更大),这些数据加载到内存并传输给Client。
       * readDropBehind表示读后即弃,即数据读取后立即丢弃cache数据,这可以在多用户并发文件读取时有效节约内存,不过会导致更频繁的磁盘操作,
       * 如果关闭此特性,read操作后数据会被cache在内存,对于同一个文件的多次读取可以有效的提升性能,但会消耗更多内存。readahead为预读,
       * 如果开启,那么Datanode将会在一次磁盘读取操作中向前额外的多读取一定字节的数据,在线性读取时,这可以有效降低IO操作延迟。
       * 这个特性需要在Datanode上开启Native libaries,否则不会生效
       */
      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
    }
    //读取block信息
    openInfo();
  }
5. DFSInputStream.openInfo()
 void openInfo() throws IOException, UnresolvedLinkException {
    synchronized(infoLock) {
      //读取block块信息并且获得最后一个block块的长度
      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      //默认三次,dfs.client.retry.times.get-last-block-length
      int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
      while (retriesForLastBlockLength > 0) {
        // Getting last block length as -1 is a special case. When cluster
        // restarts, DNs may not report immediately. At this time partial block
        // locations will not be available with NN for getting the length. Lets
        // retry for 3 times to get the length.
        if (lastBlockBeingWrittenLength == -1) {
          DFSClient.LOG.warn("Last block locations not available. "
              + "Datanodes might not have reported blocks completely."
              + " Will retry for " + retriesForLastBlockLength + " times");
          //默认4000毫秒,dfs.client.retry.interval-ms.get-last-block-length
          waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
        } else {
          break;
        }
        retriesForLastBlockLength--;
      }
      if (retriesForLastBlockLength == 0) {
        throw new IOException("Could not obtain the last block locations.");
      }
    }
  }
6. DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength()
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
    //客户端向namenode请求获取block信息
    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("newInfo = " + newInfo);
    }
    if (newInfo == null) {
      throw new IOException("Cannot open filename " + src);
    }

    if (locatedBlocks != null) {
      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
      while (oldIter.hasNext() && newIter.hasNext()) {
        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
          throw new IOException("Blocklist for " + src + " has changed!");
        }
      }
    }
    locatedBlocks = newInfo;
    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
      if (last != null) {
        if (last.getLocations().length == 0) {
          if (last.getBlockSize() == 0) {
            // if the length is zero, then no data has been written to
            // datanode. So no need to wait for the locations.
            return 0;
          }
          return -1;
        }
        final long len = readBlockLength(last);
        last.getBlock().setNumBytes(len);
        lastBlockBeingWrittenLength = len; 
      }
    }
    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
    return lastBlockBeingWrittenLength;
  }
7. 获取block信息DFSClient.getLocatedBlocks()
  public LocatedBlocks getLocatedBlocks(String src, long start)
      throws IOException {
    // DFSClient.callGetBlockLocations调用
    return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
  }
  /*
   * This is just a wrapper around callGetBlockLocations, but non-static so that
   * we can stub it out for tests.
   */
  @VisibleForTesting
  public LocatedBlocks getLocatedBlocks(String src, long start, long length)
      throws IOException {
    TraceScope scope = getPathTraceScope("getBlockLocations", src);
    try {
      // 远程RPC调用namenode server
      return callGetBlockLocations(namenode, src, start, length);
    } finally {
      scope.close();
    }
  }
8. 远程RPC调用DFSClient.callGetBlockLocations()
/**
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) 
      throws IOException {
    try {
    //通过ClientProtocol(ClientNamenodeProtocolTranslatorPB)的协议向namenode请求
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     FileNotFoundException.class,
                                     UnresolvedPathException.class);
    }
  }

9. RPC NN ClientNamenodeProtocolTranslatorPB.getBlockLocations()
@Override
  public LocatedBlocks getBlockLocations(String src, long offset, long length)
      throws AccessControlException, FileNotFoundException,
      UnresolvedLinkException, IOException {
     
    GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
        .newBuilder()
        .setSrc(src)
        .setOffset(offset)
        .setLength(length)
        .build();
    try {
      // rpc调用暂时不分析了
      //调用NameNodeRpcServer.getBlockLocations
      //rpcProxy: localhost/127.0.0.1:51397, ProtobufRpcEngine, ClientNamenodeProtocolPB
      GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null, req);
      return resp.hasLocations() ? PBHelper.convert(resp.getLocations()) : null;
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }

三、hdfs namenode服务端获取block信息

1. namenode获取block信息NameNodeRpcServer.getBlockLocations()
  public LocatedBlocks getBlockLocations(String src, long offset,  long length) 
      throws IOException {
    checkNNStartup();
    metrics.incrGetBlockLocations();
    //调用FSNamesystem.getBlockLocations()
    return namesystem.getBlockLocations(getClientMachine(),  src, offset, length);
  }
2. 获取排序后block FSNamesystem.getBlockLocations()
/**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
      long offset, long length) throws IOException {
    checkOperation(OperationCategory.READ);
    GetBlockLocationsResult res = null;
    FSPermissionChecker pc = getPermissionChecker();
    readLock();
    try {
      checkOperation(OperationCategory.READ);
     //获取指定区间的block
      res = getBlockLocations(pc, srcArg, offset, length, true, true);
    } catch (AccessControlException e) {
      logAuditEvent(false, "open", srcArg);
      throw e;
    } finally {
      readUnlock();
    }

    logAuditEvent(true, "open", srcArg);

    if (res.updateAccessTime()) {
      byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
          srcArg);
      String src = srcArg;
      writeLock();
      final long now = now();
      try {
        checkOperation(OperationCategory.WRITE);
        /**
         * Resolve the path again and update the atime only when the file
         * exists.
         *
         * XXX: Races can still occur even after resolving the path again.
         * For example:
         *
         * <ul>
         *   <li>Get the block location for "/a/b"</li>
         *   <li>Rename "/a/b" to "/c/b"</li>
         *   <li>The second resolution still points to "/a/b", which is
         *   wrong.</li>
         * </ul>
         *
         * The behavior is incorrect but consistent with the one before
         * HDFS-7463. A better fix is to change the edit log of SetTime to
         * use inode id instead of a path.
         */
        src = dir.resolvePath(pc, srcArg, pathComponents);
        final INodesInPath iip = dir.getINodesInPath(src, true);
        INode inode = iip.getLastINode();
        boolean updateAccessTime = inode != null &&
            now > inode.getAccessTime() + getAccessTimePrecision();
        if (!isInSafeMode() && updateAccessTime) {
          boolean changed = FSDirAttrOp.setTimes(dir,
              inode, -1, now, false, iip.getLatestSnapshotId());
          if (changed) {
            getEditLog().logTimes(src, -1, now);
          }
        }
      } catch (Throwable e) {
        LOG.warn("Failed to update the access time of " + src, e);
      } finally {
        writeUnlock();
      }
    }

    LocatedBlocks blocks = res.blocks;
    // 对block副本所在的datanode节点按照到client的网络拓扑距离排序
    if (blocks != null) {
      blockManager.getDatanodeManager().sortLocatedBlocks(
          clientMachine, blocks.getLocatedBlocks());
      // lastBlock is not part of getLocatedBlocks(), might need to sort it too
      LocatedBlock lastBlock = blocks.getLastLocatedBlock();
      if (lastBlock != null) {
        ArrayList<LocatedBlock> lastBlockList = Lists.newArrayList(lastBlock);
        blockManager.getDatanodeManager().sortLocatedBlocks(
            clientMachine, lastBlockList);
      }
    }
    //将排序后的blocks返回客户端
    return blocks;
  }

3. 获取blocks FSNamesystem.getBlockLocations()
/**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   * @throws IOException
   */
  GetBlockLocationsResult getBlockLocations(
      FSPermissionChecker pc, String src, long offset, long length,
      boolean needBlockToken, boolean checkSafeMode) throws IOException {
    if (offset < 0) {
      throw new HadoopIllegalArgumentException(
          "Negative offset is not supported. File: " + src);
    }
    if (length < 0) {
      throw new HadoopIllegalArgumentException(
          "Negative length is not supported. File: " + src);
    }
    final GetBlockLocationsResult ret = getBlockLocationsInt(
        pc, src, offset, length, needBlockToken);

    if (checkSafeMode && isInSafeMode()) {
      for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
        // if safemode & no block locations yet then throw safemodeException
        if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
          SafeModeException se = new SafeModeException(
              "Zero blocklocations for " + src, safeMode);
          if (haEnabled && haContext != null &&
              haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
            throw new RetriableException(se);
          } else {
            throw se;
          }
        }
      }
    }
    return ret;
  }
4. 最后调用FSNamesystem.getBlockLocationsInt()
private GetBlockLocationsResult getBlockLocationsInt(
      FSPermissionChecker pc, final String srcArg, long offset, long length,
      boolean needBlockToken)
      throws IOException {
    String src = srcArg;
    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
    src = dir.resolvePath(pc, srcArg, pathComponents);
    final INodesInPath iip = dir.getINodesInPath(src, true);
    // 将path解析成INodeFile
    final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
    if (isPermissionEnabled) {
      dir.checkPathAccess(pc, iip, FsAction.READ);
      checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
    }

    // 计算文件长度
    final long fileSize = iip.isSnapshot() ? inode.computeFileSize(iip.getPathSnapshotId()) : inode.computeFileSizeNotIncludingLastUcBlock();
    boolean isUc = inode.isUnderConstruction();
    if (iip.isSnapshot()) {
      // if src indicates a snapshot file, we need to make sure the returned
      // blocks do not exceed the size of the snapshot file.
      length = Math.min(length, fileSize - offset);
      isUc = false;
    }

    final FileEncryptionInfo feInfo =
        FSDirectory.isReservedRawName(srcArg) ? null
            : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
    //通过blockManager创建LocatedBlocks
    final LocatedBlocks blocks = blockManager.createLocatedBlocks(
        inode.getBlocks(iip.getPathSnapshotId()), fileSize,
        isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
    // Set caching information for the located blocks.
    for (LocatedBlock lb : blocks.getLocatedBlocks()) {
      cacheManager.setCachedLocations(lb);
    }
    final long now = now();
    boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
        && !iip.isSnapshot()
        && now > inode.getAccessTime() + getAccessTimePrecision();
    return new GetBlockLocationsResult(updateAccessTime, blocks);
  }

四、 BlockManager创建LocatedBlocks

1. 获取locaktedBolocks方法BlockManager.createLocatedBlocks()
/** Create a LocatedBlocks. */
  public LocatedBlocks createLocatedBlocks(final BlockInfoContiguous[] blocks,
      final long fileSizeExcludeBlocksUnderConstruction,
      final boolean isFileUnderConstruction, final long offset,
      final long length, final boolean needBlockToken,
      final boolean inSnapshot, FileEncryptionInfo feInfo)
      throws IOException {
    assert namesystem.hasReadLock();
    if (blocks == null) {
      return null;
    } else if (blocks.length == 0) {
      return new LocatedBlocks(0, isFileUnderConstruction,
          Collections.<LocatedBlock>emptyList(), null, false, feInfo);
    } else {
      if (LOG.isDebugEnabled()) {
        LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
      }
      final AccessMode mode = needBlockToken? AccessMode.READ: null;
      //根据blocks获取LocatedBlock集合
      //createLocatedBlockList也调用了createLocatedBlock方法
      final List<LocatedBlock> locatedblocks = createLocatedBlockList(
          blocks, offset, length, Integer.MAX_VALUE, mode);

      final LocatedBlock lastlb;
      final boolean isComplete;
      if (!inSnapshot) {
        final BlockInfoContiguous last = blocks[blocks.length - 1];
        final long lastPos = last.isComplete()?
            fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
            : fileSizeExcludeBlocksUnderConstruction;
        lastlb = createLocatedBlock(last, lastPos, mode);
        isComplete = last.isComplete();
      } else {
        lastlb = createLocatedBlock(blocks,
            fileSizeExcludeBlocksUnderConstruction, mode);
        isComplete = true;
      }
      return new LocatedBlocks(
          fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
          locatedblocks, lastlb, isComplete, feInfo);
    }
  }
2. 最终获取block信息BlockManager.createLocatedBlock()
  /** @return a LocatedBlock for the given block */
  private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos
      ) throws IOException {
    if (blk instanceof BlockInfoContiguousUnderConstruction) {
      if (blk.isComplete()) {
        throw new IOException(
            "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
            + ", blk=" + blk);
      }
      final BlockInfoContiguousUnderConstruction uc =
          (BlockInfoContiguousUnderConstruction) blk;
      final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
      final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
      return new LocatedBlock(eb, storages, pos, false);
    }


    // get block locations
    // 计算blk但无法读取该Block的Datanode节点数
    final int numCorruptNodes = countNodes(blk).corruptReplicas();
    // 计算FSNamesystem在内存中维护的Block=>Datanode映射的列表中,无法读取该Block的Datanode节点数
    // corruptReplicasMap存储损坏数据块Block与它对应每个数据节点与损坏原因集合映射关系的集合
    //计算blk邻近信息块损坏的副本个数,正常情况下和numCorruptNodes一个相等
    final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
    if (numCorruptNodes != numCorruptReplicas) {
      LOG.warn("Inconsistent number of corrupt replicas for "
          + blk + " blockMap has " + numCorruptNodes
          + " but corrupt replicas map has " + numCorruptReplicas);
    }
    // 获取Block所在位置(Datanode节点)
    // 计算文件blk邻近信息块存储在哪些Datanode节点上
    final int numNodes = blocksMap.numNodes(blk);
    // 如果损坏的数和副本数一样,则标识此block为坏的block
    final boolean isCorrupt = numCorruptNodes == numNodes;
    // 如果isCorrupt是true,则返回所有Datanode节点,否则,只返回可用的Block副本所在的Datanode节点
    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
    final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
    int j = 0;
    if (numMachines > 0) {
      for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
        final DatanodeDescriptor d = storage.getDatanodeDescriptor();
        final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
        if (isCorrupt || (!replicaCorrupt))
          machines[j++] = storage;
      }
    }
    assert j == machines.length :
      "isCorrupt: " + isCorrupt + 
      " numMachines: " + numMachines +
      " numNodes: " + numNodes +
      " numCorrupt: " + numCorruptNodes +
      " numCorruptRepls: " + numCorruptReplicas;
    final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
    //返回blk和datanode实例化的LocatedBlock
    return new LocatedBlock(eb, machines, pos, isCorrupt);
  }

上一篇下一篇

猜你喜欢

热点阅读