Hadoop

hadoop 中的DRF算法

2020-02-09  本文已影响0人  spraysss

DRF算法主要目的是按按照主资源进行公平分配

考虑一个有9个cpu和18GB的系统,有两个用户:用户A的每个任务都请求(1CPU,4GB)资源;用户B的每个任务都请求(3CPU,1GB)资源。如何为这种情况构建一个公平分配策略?

假设A任务分配X个实例,B任务分配Y个实例,有
X+3Y \leq 9
4X+Y \leq 18

A任务每个实例需要的cup占总资源的比例为1/9,内存占总资源的比例为4/18,4/18>1/9,所以A的主资源为内存,同理可得B的主资源为CPU,现在让A任务分配的主资源内存和B任务分配的主资源CPU 公平,则由:
\frac {4X} {18}=\frac {3Y} 9

通过三个方程,解得X=3,Y=2
即A类任务可以启动3个 ,占用资源为(3,12)
B类任务可以启动2个,占用资源为(6,2)

hadoop对DRF的应用

hadoop 2.7 中使用DominantResourceCalculator实现了cup、memory这两种主资源的公平调度
代码如下

  /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
  package org.apache.hadoop.yarn.util.resource;
  
  import org.apache.hadoop.classification.InterfaceAudience.Private;
  import org.apache.hadoop.classification.InterfaceStability.Unstable;
  import org.apache.hadoop.yarn.api.records.Resource;
  
  /**
   * A {@link ResourceCalculator} which uses the concept of  
   * <em>dominant resource</em> to compare multi-dimensional resources.
   *
   * Essentially the idea is that the in a multi-resource environment, 
   * the resource allocation should be determined by the dominant share 
   * of an entity (user or queue), which is the maximum share that the 
   * entity has been allocated of any resource. 
   * 
   * In a nutshell, it seeks to maximize the minimum dominant share across 
   * all entities. 
   * 
   * For example, if user A runs CPU-heavy tasks and user B runs
   * memory-heavy tasks, it attempts to equalize CPU share of user A 
   * with Memory-share of user B. 
   * 
   * In the single resource case, it reduces to max-min fairness for that resource.
   * 
   * See the Dominant Resource Fairness paper for more details:
   * www.cs.berkeley.edu/~matei/papers/2011/nsdi_drf.pdf
   */
  @Private
  @Unstable
  public class DominantResourceCalculator extends ResourceCalculator {
    
    @Override
    public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
      
      if (lhs.equals(rhs)) {
        return 0;
      }
      
      if (isInvalidDivisor(clusterResource)) {
        //除数为0,cup和memory一大一小
        if ((lhs.getMemory() < rhs.getMemory() && lhs.getVirtualCores() > rhs
            .getVirtualCores())
            || (lhs.getMemory() > rhs.getMemory() && lhs.getVirtualCores() < rhs
                .getVirtualCores())) {
          return 0;
        } else if (lhs.getMemory() > rhs.getMemory()
            || lhs.getVirtualCores() > rhs.getVirtualCores()) {
          return 1;
        } else if (lhs.getMemory() < rhs.getMemory()
            || lhs.getVirtualCores() < rhs.getVirtualCores()) {
          return -1;
        }
      }
      //比较主资源
      float l = getResourceAsValue(clusterResource, lhs, true);
      float r = getResourceAsValue(clusterResource, rhs, true);
      
      if (l < r) {
        return -1;
      } else if (l > r) {
        return 1;
      } else {
        //主资源相等,比较副资源占比
        l = getResourceAsValue(clusterResource, lhs, false);
        r = getResourceAsValue(clusterResource, rhs, false);
        if (l < r) {
          return -1;
        } else if (l > r) {
          return 1;
        }
      }
      
      return 0;
    }
  
    /**
     * Use 'dominant' for now since we only have 2 resources - gives us a slight
     * performance boost.
     * 
     * Once we add more resources, we'll need a more complicated (and slightly
     * less performant algorithm).
     */
    protected float getResourceAsValue(
        Resource clusterResource, Resource resource, boolean dominant) {
      // Just use 'dominant' resource
      return (dominant) ?
          Math.max(
              (float)resource.getMemory() / clusterResource.getMemory(), 
              (float)resource.getVirtualCores() / clusterResource.getVirtualCores()
              ) 
          :
            Math.min(
                (float)resource.getMemory() / clusterResource.getMemory(), 
                (float)resource.getVirtualCores() / clusterResource.getVirtualCores()
                ); 
    }
    
    @Override
    public int computeAvailableContainers(Resource available, Resource required) {
      return Math.min(
          available.getMemory() / required.getMemory(), 
          available.getVirtualCores() / required.getVirtualCores());
    }
  
    @Override
    public float divide(Resource clusterResource, 
        Resource numerator, Resource denominator) {
      return 
          getResourceAsValue(clusterResource, numerator, true) / 
          getResourceAsValue(clusterResource, denominator, true);
    }
    
    @Override
    public boolean isInvalidDivisor(Resource r) {
      if (r.getMemory() == 0.0f || r.getVirtualCores() == 0.0f) {
        return true;
      }
      return false;
    }
  
    @Override
    public float ratio(Resource a, Resource b) {
      return Math.max(
          (float)a.getMemory()/b.getMemory(), 
          (float)a.getVirtualCores()/b.getVirtualCores()
          );
    }
  
    @Override
    public Resource divideAndCeil(Resource numerator, int denominator) {
      return Resources.createResource(
          divideAndCeil(numerator.getMemory(), denominator),
          divideAndCeil(numerator.getVirtualCores(), denominator)
          );
    }
  
    @Override
    public Resource normalize(Resource r, Resource minimumResource,
                              Resource maximumResource, Resource stepFactor) {
      int normalizedMemory = Math.min(
        roundUp(
          Math.max(r.getMemory(), minimumResource.getMemory()),
          stepFactor.getMemory()),
        maximumResource.getMemory());
      int normalizedCores = Math.min(
        roundUp(
          Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
          stepFactor.getVirtualCores()),
        maximumResource.getVirtualCores());
      return Resources.createResource(normalizedMemory,
        normalizedCores);
    }
  
    @Override
    public Resource roundUp(Resource r, Resource stepFactor) {
      return Resources.createResource(
          roundUp(r.getMemory(), stepFactor.getMemory()), 
          roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
          );
    }
  
    @Override
    public Resource roundDown(Resource r, Resource stepFactor) {
      return Resources.createResource(
          roundDown(r.getMemory(), stepFactor.getMemory()),
          roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
          );
    }
  
    @Override
    public Resource multiplyAndNormalizeUp(Resource r, double by,
        Resource stepFactor) {
      return Resources.createResource(
          roundUp(
              (int)Math.ceil(r.getMemory() * by), stepFactor.getMemory()),
          roundUp(
              (int)Math.ceil(r.getVirtualCores() * by), 
              stepFactor.getVirtualCores())
          );
    }
  
    @Override
    public Resource multiplyAndNormalizeDown(Resource r, double by,
        Resource stepFactor) {
      return Resources.createResource(
          roundDown(
              (int)(r.getMemory() * by), 
              stepFactor.getMemory()
              ),
          roundDown(
              (int)(r.getVirtualCores() * by), 
              stepFactor.getVirtualCores()
              )
          );
    }
  
  }
上一篇 下一篇

猜你喜欢

热点阅读