HDFS balance策略详解
一. balancer命令详解
hdfs --config /hadoop-client/conf balancer
-threshold 10 \\集群平衡的条件,datanode间磁盘使用率相差阈值,区间选择:0~100
-policy datanode \\默认为datanode,datanode级别的平衡策略
-exclude -f /tmp/ip1.txt \\默认为空,指定该部分ip不参与balance, -f:指定输入为文件
-include -f /tmp/ip2.txt \\默认为空,只允许该部分ip参与balance,-f:指定输入为文件
-idleiterations 5 \\迭代次数,默认为 5
hdfs balance时datanode之间数据迁移的带宽设置(/hadoop-client/conf/hdfs-site.xml, 修改需重启hdfs):
<备注:6250000 / (1024 * 1024) = 6M/s>
hdfs dfsadmin -fs hdfs://${active-namenode-hostname}:8020 -setBalancerBandwidth 104857600
* The cluster is balanced;
* No block can be moved;
* No block has been moved for specified consecutive iterations (5 by default);
* An IOException occurs while communicating with the namenode;
* Another balancer is running.
二. 源码解析
private boolean shouldIgnore(DatanodeInfo dn) {
// ignore decommissioned nodes (忽略已经下架的datanode)
final boolean decommissioned = dn.isDecommissioned();
// ignore decommissioning nodes(忽略正在下架的datanode)
final boolean decommissioning = dn.isDecommissionInProgress();
// ignore nodes in exclude list (忽略参数:-exclude配置的datanode)
final boolean excluded = Util.isExcluded(excludedNodes, dn);
// ignore nodes not in the include list (if include list is not empty)
// (如果参数:-include不为空,忽略不在include列表里的datanode)
final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
if (decommissioned || decommissioning || excluded || notIncluded) {
if (LOG.isTraceEnabled()) {
LOG.trace("Excluding datanode " + dn
+ ": decommissioned=" + decommissioned
+ ", decommissioning=" + decommissioning
+ ", excluded=" + excluded
+ ", notIncluded=" + notIncluded);
return true;
return false;
集群平均使用率(计算公式):average = totalUsedSpaces * 100 / totalCapacities
totalUsedSpaces:各datanode已使用空间(dfsUsed,不包含non dfsUsed)相加;
void initAvgUtilization() {
for(StorageType t : StorageType.asList()) {
final long capacity = totalCapacities.get(t);
if (capacity > 0L) {
final double avg = totalUsedSpaces.get(t)*100.0/capacity;
avgUtilizations.set(t, avg);
单个datanode使用率:utilization = dfsUsed * 100.0 / capacity
dfsUsed:当前datanode dfs(dfsUsed,不包含non dfsUsed)已使用空间;
Double getUtilization(DatanodeStorageReport r, final StorageType t) {
long capacity = 0L;
long dfsUsed = 0L;
for(StorageReport s : r.getStorageReports()) {
if (s.getStorage().getStorageType() == t) {
capacity += s.getCapacity();
dfsUsed += s.getDfsUsed();
return capacity == 0L? null: dfsUsed*100.0/capacity;
单个datanode使用率与集群平均使用率差值:utilizationDiff = utilization - average
单个datanode utilizationDiff与阈值的差值: thresholdDiff = |utilizationDiff| - threshold
需要迁移或者可以迁入的空间:maxSize2Move = |utilizationDiff| * capacity
可以迁入的空间计算:Math.min(remaining, maxSizeToMove)
需要迁移的空间计算:Math.min(max, maxSizeToMove)
默认迭代次数为5,即运行一次balance脚本,单个datanode可以最大迁移的空间为:5*10G = 50G
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
final double average = policy.getAvgUtilization(t);
if (utilization >= average && !isSource) {
LOG.info(dn + "[" + t + "] has utilization=" + utilization
+ " >= average=" + average
+ " but it is not specified as a source; skipping it.");
final double utilizationDiff = utilization - average;
final long capacity = getCapacity(r, t);
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, maxSizeToMove);
final StorageGroup g;
if (utilizationDiff > 0) {
final Source s = dn.addSource(t, maxSize2Move, dispatcher);
if (thresholdDiff <= 0) { // within threshold
} else {
overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
g = s;
} else {
g = dn.addTarget(t, maxSize2Move);
if (thresholdDiff <= 0) { // within threshold
} else {
underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
overUtilized:utilizationDiff > 0 && thresholdDiff > 0 <使用率超过平均值,且差值大于阈值>
aboveAvgUtilized:utilizationDiff > 0 && thresholdDiff <= 0 <使用率超过平均值,且差值小于等于阈值>
belowAvgUtilized:utilizationDiff < 0 && thresholdDiff <= 0 <使用率低于平均值,且差值小于等于阈值>
underUtilized:utilizationDiff > 0 && thresholdDiff > 0 <使用率低于平均值,且差值大于等于阈值>
需要迁移数据的节点(Source类型): overUtilized, aboveAvgUtilized
能够迁入数据的节点(Target类型): underUtilized, belowAvgUtilized
数据迁移配对(原则:1. 优先为同机架,其次为其它机架; 2. 一对多配对):
第一步[Source -> Target]:each overUtilized datanode => one or more underUtilized datanodes
第二步[Source -> Target]:match each remaining overutilized datanode => one or more belowAvgUtilized datanodes
第三步[Target -> Source]:each remaining underutilized datanode (step 1未和overUtilized匹配过) => one or more aboveAvgUtilized datanodes
/** Decide all <source, target> pairs according to the matcher. */
private void chooseStorageGroups(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
chooseStorageGroups(overUtilized, underUtilized, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
构建每一对<source, target>时,需要计算当前可以迁移或者迁入的空间大小。
private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
final Task task = new Task(target, size);
dispatcher.add(source, target);
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ source.getDisplayName() + " to " + target.getDisplayName());
1. 对于一些大型的HDFS集群(随时可能扩容或下架服务器),balance脚本需要作为后台常驻进程;
2. 根据官方建议,脚本需要部署在相对空闲的服务器上;
3. 停止脚本通过kill进程实现(建议不kill,后台运行完会自动停止,多次执行同时也只会有一个线程存在,其它自动失败);
* 通过参数(threshold)增加迭代次数,以增加datanode允许迁移的数据;
* 通过参数(exclude, include)设计合理的允许进行balance策略的服务器,比如将使用率最低(20%)和最高(20%)的进行balance策略;
* 通过参数(threshold )设计合理的阈值;