hdfs写之namenode端addBlock实现<五>

2019-10-10  本文已影响0人  古语1

一、流程图

二、NameNodeRpcServer的addBlock实现

1、客户端会通过addBlock方法通过rpc调用namenode方法

addBlock 调用getAdditionalBlock


  @Override
  public LocatedBlock addBlock(String src, String clientName,
      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
      String[] favoredNodes)
      throws IOException {
    checkNNStartup();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
          + " fileId=" + fileId + " for " + clientName);
    }
    Set<Node> excludedNodesSet = null;
    if (excludedNodes != null) {
      excludedNodesSet = new HashSet<Node>(excludedNodes.length);
      for (Node node : excludedNodes) {
        excludedNodesSet.add(node);
      }
    }
    List<String> favoredNodesList = (favoredNodes == null) ? null
        : Arrays.asList(favoredNodes);
    LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
        clientName, previous, excludedNodesSet, favoredNodesList);
    if (locatedBlock != null)
      metrics.incrAddBlockOps();
    return locatedBlock;
  }

该方法获取要写的datanode并通过storeAllocatedBlock存储信息,。

 /**
   * The client would like to obtain an additional block for the indicated
   * filename (which is being written-to).  Return an array that consists
   * of the block, plus a set of machines.  The first on this list should
   * be where the client writes data.  Subsequent items in the list must
   * be provided in the connection to the first datanode.
   *
   * Make sure the previous blocks have been reported by datanodes and
   * are replicated.  Will return an empty 2-elt array if we want the
   * client to "try again later".
   */
  LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
      ExtendedBlock previous, Set<Node> excludedNodes, 
      List<String> favoredNodes) throws IOException {
    LocatedBlock[] onRetryBlock = new LocatedBlock[1];
    //获取要写的datanode
    DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
        clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
    if (targets == null) {
      assert onRetryBlock[0] != null : "Retry block is null";
      // This is a retry. Just return the last block.
      return onRetryBlock[0];
    }
    LocatedBlock newBlock = storeAllocatedBlock(
        src, fileId, clientName, previous, targets);
    return newBlock;
  }

通过getNewBlockTargets方法获取要写入的datanode

/**
   * Part I of getAdditionalBlock().
   * Analyze the state of the file under read lock to determine if the client
   * can add a new block, detect potential retries, lease mismatches,
   * and minimal replication of the penultimate block.
   * 
   * Generate target DataNode locations for the new block,
   * but do not create the new block yet.
   */
  DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
      String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
      List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
    final long blockSize;
    final int replication;
    final byte storagePolicyID;
    Node clientNode = null;
    String clientMachine = null;

    NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {}  inodeId {}" +
        " for {}", src, fileId, clientName);

    checkOperation(OperationCategory.READ);
    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
    FSPermissionChecker pc = getPermissionChecker();
    readLock();
    try {
      checkOperation(OperationCategory.READ);
      src = dir.resolvePath(pc, src, pathComponents);
      FileState fileState = analyzeFileState(
          src, fileId, clientName, previous, onRetryBlock);
      final INodeFile pendingFile = fileState.inode;
      // Check if the penultimate block is minimally replicated
      //
      if (!checkFileProgress(src, pendingFile, false)) {
        throw new NotReplicatedYetException("Not replicated yet: " + src);
      }
      src = fileState.path;

      if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
        // This is a retry. No need to generate new locations.
        // Use the last block if it has locations.
        return null;
      }
      if (pendingFile.getBlocks().length >= maxBlocksPerFile) {
        throw new IOException("File has reached the limit on maximum number of"
            + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
            + "): " + pendingFile.getBlocks().length + " >= "
            + maxBlocksPerFile);
      }
      blockSize = pendingFile.getPreferredBlockSize();
      clientMachine = pendingFile.getFileUnderConstructionFeature()
          .getClientMachine();
      clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
          clientMachine);
      replication = pendingFile.getFileReplication();
      storagePolicyID = pendingFile.getStoragePolicyID();
    } finally {
      readUnlock();
    }

    if (clientNode == null) {
      clientNode = getClientNode(clientMachine);
    }

    // choose targets for the new block to be allocated.
    return getBlockManager().chooseTarget4NewBlock( 
        src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
        storagePolicyID);
  }

storeAllocatedBlock方法中会调用commitOrCompleteLastBlock方法提交上一个block,并调用persistNewBlock持久化editlog,analyzeFileState方法分析block请求状态并且将分配好的block保存到onTetryBlock中。


  /**
   * Part II of getAdditionalBlock().
   * Should repeat the same analysis of the file state as in Part 1,
   * but under the write lock.
   * If the conditions still hold, then allocate a new block with
   * the new targets, add it to the INode and to the BlocksMap.
   */
  LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName,
      ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
    Block newBlock = null;
    long offset;
    checkOperation(OperationCategory.WRITE);
    waitForLoadingFSImage();
    writeLock();
    try {
      checkOperation(OperationCategory.WRITE);
      // Run the full analysis again, since things could have changed
      // while chooseTarget() was executing.
      LocatedBlock[] onRetryBlock = new LocatedBlock[1];
      //分析当前Block分配请求类型
      FileState fileState = 
          analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
      final INodeFile pendingFile = fileState.inode;
      src = fileState.path;

      if (onRetryBlock[0] != null) {
        if (onRetryBlock[0].getLocations().length > 0) {
          // This is a retry. Just return the last block if having locations.
          return onRetryBlock[0];
        } else {
          // add new chosen targets to already allocated block and return
          BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
          ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
              .setExpectedLocations(targets);
          offset = pendingFile.computeFileSize();
          return makeLocatedBlock(lastBlockInFile, targets, offset);
        }
      }

      // commit the last block and complete it if it has minimum replicas
      commitOrCompleteLastBlock(pendingFile, fileState.iip,
                                ExtendedBlock.getLocalBlock(previous));

      // allocate new block, record block locations in INode.
      newBlock = createNewBlock();
      INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
      saveAllocatedBlock(src, inodesInPath, newBlock, targets);
      //block信息持久到EditLog中
      persistNewBlock(src, pendingFile);
      offset = pendingFile.computeFileSize();
    } finally {
      writeUnlock();
    }
    getEditLog().logSync();

    // Return located block
    return makeLocatedBlock(newBlock, targets, offset);
  }

三、BlockManager的chooseTarget4NewBlock实现

  /**
   * Choose target datanodes for creating a new block.
   * 
   * @throws IOException
   *           if the number of targets < minimum replication.
   * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
   *      Set, long, List, BlockStoragePolicy)
   */
  public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
      final int numOfReplicas, final Node client,
      final Set<Node> excludedNodes,
      final long blocksize,
      final List<String> favoredNodes,
      final byte storagePolicyID) throws IOException {
    List<DatanodeDescriptor> favoredDatanodeDescriptors = 
        getDatanodeDescriptors(favoredNodes);
    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
//选择出要写的datanode节点
    final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
        numOfReplicas, client, excludedNodes, blocksize, 
        favoredDatanodeDescriptors, storagePolicy);
    if (targets.length < minReplication) {
      throw new IOException("File " + src + " could only be replicated to "
          + targets.length + " nodes instead of minReplication (="
          + minReplication + ").  There are "
          + getDatanodeManager().getNetworkTopology().getNumOfLeaves()
          + " datanode(s) running and "
          + (excludedNodes == null? "no": excludedNodes.size())
          + " node(s) are excluded in this operation.");
    }
    return targets;
  }

四、BlockPlacementPolicyDefault.chooseTarget实现

分配datanode策略


  @Override
  DatanodeStorageInfo[] chooseTarget(String src,
      int numOfReplicas,
      Node writer,
      Set<Node> excludedNodes,
      long blocksize,
      List<DatanodeDescriptor> favoredNodes,
      BlockStoragePolicy storagePolicy) {
    try {
      if (favoredNodes == null || favoredNodes.size() == 0) {
        // Favored nodes not specified, fall back to regular block placement.
        return chooseTarget(src, numOfReplicas, writer,
            new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
            excludedNodes, blocksize, storagePolicy);
      }

      Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
          new HashSet<Node>() : new HashSet<Node>(excludedNodes);
      final List<StorageType> requiredStorageTypes = storagePolicy
          .chooseStorageTypes((short)numOfReplicas);
      final EnumMap<StorageType, Integer> storageTypes =
          getRequiredStorageTypes(requiredStorageTypes);

      // Choose favored nodes
      List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
      boolean avoidStaleNodes = stats != null
          && stats.isAvoidingStaleDataNodesForWrite();

      int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas);
      numOfReplicas = maxNodesAndReplicas[0];
      int maxNodesPerRack = maxNodesAndReplicas[1];

      for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
        DatanodeDescriptor favoredNode = favoredNodes.get(i);
        // Choose a single node which is local to favoredNode.
        // 'results' is updated within chooseLocalNode
        final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
            favoriteAndExcludedNodes, blocksize, maxNodesPerRack,
            results, avoidStaleNodes, storageTypes, false);
        if (target == null) {
          LOG.warn("Could not find a target for file " + src
              + " with favored node " + favoredNode); 
          continue;
        }
        favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
      }

      if (results.size() < numOfReplicas) {
        // Not enough favored nodes, choose other nodes.
        numOfReplicas -= results.size();
        DatanodeStorageInfo[] remainingTargets = 
            chooseTarget(src, numOfReplicas, writer, results,
                false, favoriteAndExcludedNodes, blocksize, storagePolicy);
        for (int i = 0; i < remainingTargets.length; i++) {
          results.add(remainingTargets[i]);
        }
      }
      return getPipeline(writer,
          results.toArray(new DatanodeStorageInfo[results.size()]));
    } catch (NotEnoughReplicasException nr) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Failed to choose with favored nodes (=" + favoredNodes
            + "), disregard favored nodes hint and retry.", nr);
      }
      // Fall back to regular block placement disregarding favored nodes hint
      return chooseTarget(src, numOfReplicas, writer, 
          new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
          excludedNodes, blocksize, storagePolicy);
    }
  }

具体实现选择策略:

  1. 如果还没有分配的datanode,则调用chooseLocalStorage分配本地节点
  2. 如果已经0或者1个已经分配则调用chooseRemoteRack远程选择一台datanode
  3. 如果前两个节点在一个机架上,则调用chooseRemoteRack在另一个机架上分配
  4. 如果是新的block,则调用chooseLocalRack本机架分配
  5. 如果副本数大于2,则调用chooseRandom随机选择

选择的时候,都会调用isGoodTarget方法进行判断是否是个满足选择条件的datanode,根据存储空间,网络负载情况、是否下限、xceiver线程数是否满足。


  /**
   * choose <i>numOfReplicas</i> from all data nodes
   * @param numOfReplicas additional number of replicas wanted
   * @param writer the writer's machine, could be a non-DatanodeDescriptor node
   * @param excludedNodes datanodes that should not be considered as targets
   * @param blocksize size of the data to be written
   * @param maxNodesPerRack max nodes allowed per rack
   * @param results the target nodes already chosen
   * @param avoidStaleNodes avoid stale nodes in replica choosing
   * @return local node of writer (not chosen node)
   */
  private Node chooseTarget(int numOfReplicas,
                            Node writer,
                            final Set<Node> excludedNodes,
                            final long blocksize,
                            final int maxNodesPerRack,
                            final List<DatanodeStorageInfo> results,
                            final boolean avoidStaleNodes,
                            final BlockStoragePolicy storagePolicy,
                            final EnumSet<StorageType> unavailableStorages,
                            final boolean newBlock) {
    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
      return (writer instanceof DatanodeDescriptor) ? writer : null;
    }
    final int numOfResults = results.size();
    final int totalReplicasExpected = numOfReplicas + numOfResults;

    if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
      writer = results.get(0).getDatanodeDescriptor();
    }

    // Keep a copy of original excludedNodes
    final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);

    // choose storage types; use fallbacks for unavailable storages
    final List<StorageType> requiredStorageTypes = storagePolicy
        .chooseStorageTypes((short) totalReplicasExpected,
            DatanodeStorageInfo.toStorageTypes(results),
            unavailableStorages, newBlock);
    final EnumMap<StorageType, Integer> storageTypes =
        getRequiredStorageTypes(requiredStorageTypes);
    if (LOG.isTraceEnabled()) {
      LOG.trace("storageTypes=" + storageTypes);
    }

    try {
      if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
        throw new NotEnoughReplicasException(
            "All required storage types are unavailable: "
            + " unavailableStorages=" + unavailableStorages
            + ", storagePolicy=" + storagePolicy);
      }

      //如果还没有分配的datanode,则分配本地节点
      if (numOfResults == 0) {
        writer = chooseLocalStorage(writer, excludedNodes, blocksize,
            maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
                .getDatanodeDescriptor();
        if (--numOfReplicas == 0) {
          return writer;
        }
      }
      final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
      if (numOfResults <= 1) {
        //远程选择一台datanode
        chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
            results, avoidStaleNodes, storageTypes);
        if (--numOfReplicas == 0) {
          return writer;
        }
      }
      if (numOfResults <= 2) {
        final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
        if (clusterMap.isOnSameRack(dn0, dn1)) {
          //如果之前两个节点在一个机架上,选择一个其他机架的

          chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
              results, avoidStaleNodes, storageTypes);
        } else if (newBlock){
          //新block则本机架选择
          chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
              results, avoidStaleNodes, storageTypes);
        } else {
          //都不满足,本机架选择
          chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
              results, avoidStaleNodes, storageTypes);
        }
        if (--numOfReplicas == 0) {
          return writer;
        }
      }
      //如果副本数大于2,则随机选择
      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
    } catch (NotEnoughReplicasException e) {
      final String message = "Failed to place enough replicas, still in need of "
          + (totalReplicasExpected - results.size()) + " to reach "
          + totalReplicasExpected
          + " (unavailableStorages=" + unavailableStorages
          + ", storagePolicy=" + storagePolicy
          + ", newBlock=" + newBlock + ")";

      if (LOG.isTraceEnabled()) {
        LOG.trace(message, e);
      } else {
        LOG.warn(message + " " + e.getMessage());
      }

      if (avoidStaleNodes) {
        // Retry chooseTarget again, this time not avoiding stale nodes.

        // excludedNodes contains the initial excludedNodes and nodes that were
        // not chosen because they were stale, decommissioned, etc.
        // We need to additionally exclude the nodes that were added to the 
        // result list in the successful calls to choose*() above.
        for (DatanodeStorageInfo resultStorage : results) {
          addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
        }
        // Set numOfReplicas, since it can get out of sync with the result list
        // if the NotEnoughReplicasException was thrown in chooseRandom().
        numOfReplicas = totalReplicasExpected - results.size();
        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
            maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
            newBlock);
      }

      boolean retry = false;
      // simply add all the remaining types into unavailableStorages and give
      // another try. No best effort is guaranteed here.
      for (StorageType type : storageTypes.keySet()) {
        if (!unavailableStorages.contains(type)) {
          unavailableStorages.add(type);
          retry = true;
        }
      }
      if (retry) {
        for (DatanodeStorageInfo resultStorage : results) {
          addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
              oldExcludedNodes);
        }
        numOfReplicas = totalReplicasExpected - results.size();
        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
            maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
            newBlock);
      }
    }
    return writer;
  }

上一篇下一篇

猜你喜欢

热点阅读