原创-HDFS副本存储策略源码分析一
2019-06-06 本文已影响0人
无色的叶
HDFS默认副本存储策略
可通过参数:dfs.block.replicator.classname 配置实现类,默认实现类:
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault
- 1st replica 如果写请求方所在机器是其中一个datanode,则直接存放在本地,否则随机在集群中选择一个datanode.
- 2nd replica 第二个副本存放于不同第一个副本的所在的机架.
- 3rd replica 第三个副本存放于第二个副本所在的机架,但是属于不同的节点
- 4rd replica 第四个副本或更多副本存放策略是随机选择datanode节点进行存储
存储源码分析
主要是调用DatanodeStorageInfo[] chooseTarget方法进行datanode节点的选取,有个同名重载的方法,只是多一个参数favoredNodes,该参数是通过客户端写入时,可选参数,优先数据存储的dataNode节点,其它都是使用无favoredNodes参数的方法
/**
* choose <i>numOfReplicas</i> from all data nodes
*
* @param srcPath 数据块是哪个文件的
* @param numOfReplicas additional number of replicas wanted
* 副本数
* @param writer the writer's machine, could be a non-DatanodeDescriptor node
* 写入者所在的服务器,如果不是集群中的服务器,则为空
* @param excludedNodes datanodes that should not be considered as targets
* 这个列表中的结点应该排除在外,不能被选为目标结点
* @param blocksize size of the data to be written
* 数据要写入的大小
* @return 返回DatanodeDescriptor的实例数组,这些结点作为此数据块的目标结点,并且被为作一个pipeline被排序。
*/
@Override
public DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosenNodes,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize, storagePolicy, flags);
}
@Override
DatanodeStorageInfo[] chooseTarget(String src,
int numOfReplicas,
Node writer,
Set<Node> excludedNodes,
long blocksize,
List<DatanodeDescriptor> favoredNodes,
BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) {
进一步调用同名方法chooseTarget
/**
* This is the implementation.
*/
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosenStorage,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> addBlockFlags) {
//副本数为0或者datanode为0,直接返回空数组
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves() == 0) {
return DatanodeStorageInfo.EMPTY_ARRAY;
}
if (excludedNodes == null) {
excludedNodes = new HashSet<>();
}
//计算每个机架允许分配的最大副本数,限定了一个集群中所有结点的总共副本数量
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
for (DatanodeStorageInfo storage : chosenStorage) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}
List<DatanodeStorageInfo> results = null;
Node localNode = null;
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
boolean avoidLocalNode = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
&& writer != null
&& !excludedNodes.contains(writer));
// Attempt to exclude local node if the client suggests so. If no enough
// nodes can be obtained, it falls back to the default block placement
// policy.
//如果客户端建议排除本地节点,则尝试排除。如果没有足够的
//可以获得节点,它返回到默认块的位置
if (avoidLocalNode) {
results = new ArrayList<>(chosenStorage);
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
if (writer != null) {
excludedNodeCopy.add(writer);
}
localNode = chooseTarget(numOfReplicas, writer,
excludedNodeCopy, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty());
if (results.size() < numOfReplicas) {
// not enough nodes; discard results and fall back
results = null;
}
}
if (results == null) {
results = new ArrayList<>(chosenStorage);
localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes,
storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty());
}
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}
LOG.info("----localNode---" + localNode.getName() + "===writer===" + writer.getName());
// sorting nodes to form a pipeline
//根据最短距离排序目标节点列表,形成pipeline
return getPipeline(
(writer != null && writer instanceof DatanodeDescriptor) ? writer
: localNode,
results.toArray(new DatanodeStorageInfo[results.size()]));
}
再调用Node chooseTarget方法,在选择的过程中可能会发生异常,因为有的时候我们没有配置机架感知,集群中都属于一个默认机架的default-rack,则会导致chooseRemoteRack的方法出错,因为没有满足条件的其余机架,这时需要一些重试策略
/**
* choose <i>numOfReplicas</i> from all data nodes
*
* @param numOfReplicas additional number of replicas wanted
* 副本数
* @param writer the writer's machine, could be a non-DatanodeDescriptor node
* 写入者所在的服务器,如果不是集群中的服务器,则为空
* @param excludedNodes datanodes that should not be considered as targets
* 这个列表中的结点应该排除在外,不能被选为目标结点
* @param blocksize size of the data to be written
* 数据要写入的大小
* @param maxNodesPerRack max nodes allowed per rack
* @param results the target nodes already chosen
* @param avoidStaleNodes avoid stale nodes in replica choosing
* @return local node of writer (not chosen node)
*/
private Node chooseTarget(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final BlockStoragePolicy storagePolicy,
final EnumSet<StorageType> unavailableStorages,
final boolean newBlock) {
LOG.info("-------Node chooseTarget----------");
LOG.info("---numOfReplicas--" + numOfReplicas + "---Node writer--" + writer.getName() + "===results==" + results.size());
// 如果额外需要请求副本数为0,或者集群中没有可选节点
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves() == 0) {
// 如果writer请求者在其中一个datanode上则返回此节点,否则直接返回null
return (writer instanceof DatanodeDescriptor) ? writer : null;
}
// 已经选择完成的节点数
final int numOfResults = results.size();
// 期望达到的副本总数
final int totalReplicasExpected = numOfReplicas + numOfResults;
// 如果writer为空或不在datanode上,则取出已选择好列表中的第一个位置所在节点,赋值给writer
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0).getDatanodeDescriptor();
}
// Keep a copy of original excludedNodes
final Set<Node> oldExcludedNodes = new HashSet<>(excludedNodes);
// choose storage types; use fallbacks for unavailable storages
// 根据存储策略获取副本需要满足的存储类型列表,如果有不可用的存储类型,会采用fallback的类型
final List<StorageType> requiredStorageTypes = storagePolicy
.chooseStorageTypes((short) totalReplicasExpected,
DatanodeStorageInfo.toStorageTypes(results),
unavailableStorages, newBlock);
// 将存储类型列表进行计数统计,并存于map中
final EnumMap<StorageType, Integer> storageTypes =
getRequiredStorageTypes(requiredStorageTypes);
if (LOG.isTraceEnabled()) {
LOG.trace("storageTypes=" + storageTypes);
}
try {
if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
throw new NotEnoughReplicasException(
"All required storage types are unavailable: "
+ " unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy);
}
writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes);
} catch (NotEnoughReplicasException e) {
final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected
+ " (unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy
+ ", newBlock=" + newBlock + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(message, e);
} else {
LOG.warn(message + " " + e.getMessage());
}
if (avoidStaleNodes) {
// Retry chooseTarget again, this time not avoiding stale nodes.
// excludedNodes contains the initial excludedNodes and nodes that were
// not chosen because they were stale, decommissioned, etc.
// We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above.
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
}
// Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock);
}
boolean retry = false;
// simply add all the remaining types into unavailableStorages and give
// another try. No best effort is guaranteed here.
for (StorageType type : storageTypes.keySet()) {
if (!unavailableStorages.contains(type)) {
unavailableStorages.add(type);
retry = true;
}
}
if (retry) {
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
oldExcludedNodes);
}
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock);
}
}
return writer;
}
真正选择dataNode逻辑实现在chooseTargetInOrder方法中
protected Node chooseTargetInOrder(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final boolean newBlock,
EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException {
//已选择的目标节点数
final int numOfResults = results.size();
LOG.info("========Node chooseTargetInOrder====numOfResults==" + numOfResults);
//如果numOfResults == 0 则表示副本一个都还没开始选,首先从选本地节点开始
if (numOfResults == 0) {
DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageTypes, true);
LOG.info("---numOfResults == 0---");
if (storageInfo != null) {
LOG.info("---storageInfo---" + storageInfo.getDatanodeDescriptor().getHostName());
}
writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
: null;
// 如果此时目标需求完成的副本数为降为0,代表选择目标完成,返回第一个节点writer
if (--numOfReplicas == 0) {
return writer;
}
}
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
// 如果dn0,dn1所在同机房
if (clusterMap.isOnSameRack(dn0, dn1)) {
// 则选择1个不同于dn0,dn1所在机房的副本位置
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else if (newBlock) {
// 如果是新的block块,则选取1个于dn1所在同机房的节点位置
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else {
// 否则选取于writer同机房的位置
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
// 如果副本数已经超过3个,说明设置的block的时候,则剩余位置在集群中随机选择放置节点
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;
}
首先选择本地存储位置.如果没有满足条件的,再选择本地机架的节点,如果还是没有满足条件的,进一步降级选择不同机架的节点,最后随机选择集群中的节点,关系图如下
image.png都会调用chooseRandom方法
/**
* chooseLocalStorage调用chooseRandom时:
* <p>
* numOfReplicas:1,表示我们需要选取多少个Node;
* scope:网络拓扑结构的根节点(root),即"";表示从整个集群中随机选取;
* excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;
* <p>
* chooseRemoteRack调用chooseRandom时:
* <p>
* numOfReplicas:1,表示我们需要选取多少个Node;
* scope:~rack,表示从整个集群中非rack机架中随机选取;
* excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;
* <p>
* chooseLocalRack调用chooseRandom时:
* <p>
* numOfReplicas:1,表示我们需要选取多少个Node;
* scope:rack,表示从集群机架rack中随机选取;
* excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;
* <p>
* Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
*
* @return the first chosen node, if there is any.
*/
protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
String scope,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException {
后续详细分析方法chooseTargetInOrder