(八)统计信息Stats

2021-11-29  本文已影响0人  guessguess

从前面可以知道,ribbon会通过统计数据,找到合适的分区,再找到分区对应负载均衡器,通过负载均衡器中的服务列表,对服务进行匹配,最后找到对应的服务。

那么下面先来说说统计数据

public interface DistributionMBean {
    void clear();清理分布数据,重置到初始状态
    long getNumValues();数据的个数
    double getMean();平均值
    double getVariance();方差
    double getStdDev();标准差
    double getMinimum();最小值
    double getMaximum();最大值
}
public interface DataCollector {
    void noteValue(double val);
}

这个类的接口比较简单,主要是用于数据采集。

public class Distribution implements DistributionMBean, DataCollector {

    private long numValues;值的数量
    private double sumValues;值的总和
    private double sumSquareValues;值的平方总和
    private double minValue;值的最小值
    private double maxValue;值的最大值

    初始化
    public Distribution() {
        numValues = 0L;
        sumValues = 0.0;
        sumSquareValues = 0.0;
        minValue = 0.0;
        maxValue = 0.0;
    }

    进行值的存储
    public void noteValue(double val) {
        numValues++;
        sumValues += val;
        sumSquareValues += val * val;
        if (numValues == 1) {
            minValue = val;
            maxValue = val;
        } else if (val < minValue) {
            minValue = val;
        } else if (val > maxValue) {
            maxValue = val;
        }
    }
    清除数据
    public void clear() {
        numValues = 0L;
        sumValues = 0.0;
        sumSquareValues = 0.0;
        minValue = 0.0;
        maxValue = 0.0;
    }
    获取值的数量
    public long getNumValues() {
        return numValues;
    }
    获取平均值
    public double getMean() {
        if (numValues < 1) {
            return 0.0;
        } else {
            return sumValues / numValues;
        }
    }
    方差
    public double getVariance() {
        if (numValues < 2) {
            return 0.0;
        } else if (sumValues == 0.0) {
            return 0.0;
        } else {
            double mean = getMean();
            return (sumSquareValues / numValues) - mean * mean;
        }
    }
    标准差
    public double getStdDev() {
        return Math.sqrt(getVariance());
    }
    获取最大值
    public double getMinimum() {
        return minValue;
    }
    获取最小值
    public double getMaximum() {
        return maxValue;
    }
    
    合并另外一个分布数据
    public void add(Distribution anotherDistribution) {
        if (anotherDistribution != null) {
            numValues += anotherDistribution.numValues;
            sumValues += anotherDistribution.sumValues;
            sumSquareValues += anotherDistribution.sumSquareValues;
            minValue = (minValue < anotherDistribution.minValue) ? minValue
                    : anotherDistribution.minValue;
            maxValue = (maxValue > anotherDistribution.maxValue) ? maxValue
                    : anotherDistribution.maxValue;
        }
    }
}
public class DataBuffer extends Distribution {
    private final Lock lock;锁
    private final double[] buf;缓冲区
    private long startMillis;开始时间
    private long endMillis;结束时间
    private int size;数据量的个数
    private int insertPos;插入的位置
   
    构造方法,初始化,容量默认为1000
    public DataBuffer(int capacity) {
        lock = new ReentrantLock();
        buf = new double[capacity];
        startMillis = 0;
        size = 0;
        insertPos = 0;
    }

    public Lock getLock() {
        return lock;
    }

    public int getCapacity() {
        return buf.length;
    }

    public long getSampleIntervalMillis() {
        return (endMillis - startMillis);
    }

    public int getSampleSize() {
        return size;
    }

    清除缓冲区
    @Override
    public void clear() {
        同时将统计数据清除
        super.clear();
        startMillis = 0;
        size = 0;
        insertPos = 0;
    }
   
    开始收集
    public void startCollection() {
        清除数据,初始化
        clear();
        startMillis = System.currentTimeMillis();
    }
    
    结束收集
    public void endCollection() {
        endMillis = System.currentTimeMillis();
        Arrays.sort(buf, 0, size);
    }
    
    记录数据
    @Override
    public void noteValue(double val) {
        父类中有一些指标,需要记录,如最大值,最小值,总平方,值的数量,总值等等
        super.noteValue(val);
        位置后移,保存到数组中
        buf[insertPos++] = val;
        如果超过缓存区长度,从头开始覆盖,size变成缓存区长度,从头开始覆盖
        if (insertPos >= buf.length) {
            insertPos = 0;
            size = buf.length;
        } else if (insertPos > size) {
            size = insertPos;
        }
    }
    
    取百分之a,到百分之b范围内的缓冲数据
    public double[] getPercentiles(double[] percents, double[] percentiles) {
        for (int i = 0; i < percents.length; i++) {
            percentiles[i] = computePercentile(percents[i]);
        }
        return percentiles;
    }

    private double computePercentile(double percent) {
        if (size <= 0) {
            return 0.0;
        } else if (percent <= 0.0) {
            return buf[0];
        } else if (percent >= 100.0) {        
            return buf[size - 1];
        }
        double index = (percent / 100.0) * size; 
        int iLow = (int) Math.floor(index);
        int iHigh = (int) Math.ceil(index);
        assert 0 <= iLow && iLow <= index && index <= iHigh && iHigh <= size;
        assert (iHigh - iLow) <= 1;
        if (iHigh >= size) {
            return buf[size - 1];
        } else if (iLow == iHigh) {
            return buf[iLow];
        } else {
            return buf[iLow] + (index - iLow) * (buf[iHigh] - buf[iLow]);
        }
    }
}

public abstract class DataAccumulator implements DataCollector {
    private DataBuffer current;当前采集的数据
    private DataBuffer previous;之前采集的数据
    private final Object swapLock = new Object();交换锁
    
    初始化构造方法
    public DataAccumulator(int bufferSize) {
        this.current = new DataBuffer(bufferSize);
        this.previous = new DataBuffer(bufferSize);
    }

    采集数据,保存到当前的缓冲区中
    public void noteValue(double val) {
        synchronized (swapLock) {
            Lock l = current.getLock();
            l.lock();
            try {
                current.noteValue(val);
            } finally {
                l.unlock();
            }
        }
    }
    
    推送数据
    public void publish() {
        DataBuffer tmp = null;
        Lock l = null;
        synchronized (swapLock) {
            将当前的数据,保存到previous
            tmp = current;
            current = previous;
            previous = tmp;
            l = current.getLock();
            l.lock();
            try {
                清除current的数据,开始收集---这里感觉是比较聪明的做法,不用来回new。
                current.startCollection();
            } finally {
                l.unlock();
            }
            l = tmp.getLock();
            l.lock();
        }
        try {
            结束收集
            tmp.endCollection();
            推送当前统计数据
            publish(tmp);
        } finally {
            l.unlock();
        }
    }
    暴露给子类去实现的一个推送数据接口,DataDistribution进行了覆写,
    protected abstract void publish(DataBuffer buf);
}

public class DataDistribution extends DataAccumulator implements DataDistributionMBean {
    private long numValues = 0L;总数据样本数(有效+无效,超出缓存区的就是无效)
    private double mean = 0.0;平均值
    private double variance = 0.0;方差
    private double stddev = 0.0;标准差
    private double min = 0.0;最小值
    private double max = 0.0;最大值
    private long ts = 0L;
    private long interval = 0L;间隔
    private int size = 0;有效样品数
    private final double[] percents;
    private final double[] percentiles;

    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EI_EXPOSE_REP2")
    public DataDistribution(int bufferSize, double[] percents) {
        初始化父类,DataAccumulator的缓存区(this.current,this.previous)
        super(bufferSize);
        assert percentsOK(percents);
        this.percents = percents;
        this.percentiles = new double[percents.length];
    }

    private static boolean percentsOK(double[] percents) {
        if (percents == null) {
            return false;
        }
        for (int i = 0; i < percents.length; i++) {
            if (percents[i] < 0.0 || percents[i] > 100.0) { // SUPPRESS CHECKSTYLE MagicNumber
                return false;
            }
        }
        return true;
    }

    protected void publish(DataBuffer buf) {
        将统计数据赋值到成员变量中
        ts = System.currentTimeMillis();
        numValues = buf.getNumValues();
        mean = buf.getMean();
        variance = buf.getVariance();
        stddev = buf.getStdDev();
        min = buf.getMinimum();
        max = buf.getMaximum();
        interval = buf.getSampleIntervalMillis();
        size = buf.getSampleSize();
        buf.getPercentiles(percents, percentiles);
    }

    /** {@inheritDoc} */
    public void clear() {
        numValues = 0L;
        mean = 0.0;
        variance = 0.0;
        stddev = 0.0;
        min = 0.0;
        max = 0.0;
        ts = 0L;
        interval = 0L;
        size = 0;
        for (int i = 0; i < percentiles.length; i++) {
            percentiles[i] = 0.0;
        }
    }

    public long getNumValues() {
        return numValues;
    }

    public double getMean() {
        return mean;
    }

    public double getVariance() {
        return variance;
    }

    /** {@inheritDoc} */
    public double getStdDev() {
        return stddev;
    }

    public double getMinimum() {
        return min;
    }

    public double getMaximum() {
        return max;
    }

    public String getTimestamp() {
        return new Date(getTimestampMillis()).toString();
    }

    public long getTimestampMillis() {
        return ts;
    }

    public long getSampleIntervalMillis() {
        return interval;
    }

    public int getSampleSize() {
        return size;
    }
    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EI_EXPOSE_REP")
    public double[] getPercents() {
        return percents;
    }
    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EI_EXPOSE_REP")
    public double[] getPercentiles() {
        return percentiles;
    }
} 
public class DataPublisher {
    private static final String THREAD_NAME = "DataPublisher";
    private static final boolean DAEMON_THREADS = true;守护线程
    private static ScheduledExecutorService sharedExecutor = null;周期性线程池
    private final DataAccumulator accumulator;数据采集器
    private final long delayMillis;延时时间
    private Future<?> future = null;回调
    
    构造方法,初始化
    public DataPublisher(DataAccumulator accumulator,
                         long delayMillis) {
        this.accumulator = accumulator;
        this.delayMillis = delayMillis;
    }
   
    判断是否在运行,通过查看回调是否为空
    public synchronized boolean isRunning() {
        return (future != null);
    }

    启动方法
    public synchronized void start() {
        if (future == null) {
            Runnable task = new Runnable() {
                                public void run() {
                                    try {
                                        accumulator.publish();
                                    } catch (Exception e) {
                                        handleException(e);
                                    }
                                }
                            };
            通过周期性线程池去定时更新采集的数据
            future = getExecutor().scheduleWithFixedDelay(task,
                                                          delayMillis, delayMillis,
                                                          TimeUnit.MILLISECONDS);
        }
    }

    
    protected synchronized ScheduledExecutorService getExecutor() {
        if (sharedExecutor == null) {
            sharedExecutor = Executors.newScheduledThreadPool(1, new PublishThreadFactory());
        }
        return sharedExecutor;
    }

    private static final class PublishThreadFactory implements ThreadFactory {
        PublishThreadFactory() { }
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, THREAD_NAME);
            t.setDaemon(DAEMON_THREADS);
            return t;
        }
    }

    public synchronized void stop() {
        if (future != null) {
            future.cancel(false);
            future = null;
        }
    }

    protected void handleException(Exception e) {
    }

} 

public class ServerStats {
    private static final int DEFAULT_PUBLISH_INTERVAL =  60 * 1000; // = 1 minute  更新统计信息的间隔
    private static final int DEFAULT_BUFFER_SIZE = 60 * 1000; // = 1000 requests/sec for 1 minute  每一秒存储的数据为长度=1000的数组

    获取百分比数组-正态分布的有效百分比
    private static final double[] PERCENTS = makePercentValues();
    private static double[] makePercentValues() {
        Percent[] percents = Percent.values();
        double[] p = new double[percents.length];
        for (int i = 0; i < percents.length; i++) {
            p[i] = percents[i].getValue();
        }
        return p;
    }
    private static enum Percent {
        TEN(10), TWENTY_FIVE(25), FIFTY(50), SEVENTY_FIVE(75), NINETY(90),
        NINETY_FIVE(95), NINETY_EIGHT(98), NINETY_NINE(99), NINETY_NINE_POINT_FIVE(99.5);
        private double val;
        Percent(double val) {
            this.val = val;
        }
        public double getValue() {
            return val;
        }
    }

    总请求数
    AtomicLong totalRequests = new AtomicLong();
    请求总数自增
    public void incrementNumRequests(){
        totalRequests.incrementAndGet();
    }

    dataDist用于采集响应数据,并且运算
    private DataDistribution dataDist = new DataDistribution(1, PERCENTS); 
    responseTimeDist只是简单保存响应数据 
    private final Distribution responseTimeDist = new Distribution();

    记录响应数据
    public void noteResponseTime(double msecs){
        dataDist.noteValue(msecs);
        responseTimeDist.noteValue(msecs);
    }

    总的活跃请求数
    AtomicInteger activeRequestsCount = new AtomicInteger(0);
    上次活跃的时间
    private volatile long lastActiveRequestsCountChangeTimestamp;
    private MeasuredRate requestCountInWindow = new MeasuredRate(300000L);
    最近有活跃请求的时间
    private volatile long lastActiveRequestsCountChangeTimestamp;
    最后访问的时间戳
    private volatile long lastAccessedTimestamp;
    第一次连接的时间戳
    private volatile long firstConnectionTimestamp = 0;
    活跃请求数的增加
    public void incrementActiveRequestsCount() {        
        activeRequestsCount.incrementAndGet();
        用于统计单位时间内的计数
        requestCountInWindow.increment();
        long currentTime = System.currentTimeMillis();
        lastActiveRequestsCountChangeTimestamp = currentTime;
        lastAccessedTimestamp = currentTime;
        if (firstConnectionTimestamp == 0) {
            firstConnectionTimestamp = currentTime;
        }
    }
    活跃请求数的减少
    public void decrementActiveRequestsCount() {
        if (activeRequestsCount.decrementAndGet() < 0) {
            activeRequestsCount.set(0);
        }
        lastActiveRequestsCountChangeTimestamp = System.currentTimeMillis();
    }
    
    统计300秒内的计数
    private MeasuredRate requestCountInWindow = new MeasuredRate(300000L);
    统计1秒内的失败请求数量
    long failureCountSlidingWindowInterval = 1000; 
    private MeasuredRate serverFailureCounts = new MeasuredRate(failureCountSlidingWindowInterval);
    public void addToFailureCount(){
        serverFailureCounts.increment();
    }
    public long getFailureCount(){
        return serverFailureCounts.getCurrentCount();
    }

    Server server;
    初始化,开始数据采集
    public void initialize(Server server) {
        serverFailureCounts = new MeasuredRate(failureCountSlidingWindowInterval);
        统计300秒内的计数
        requestCountInWindow = new MeasuredRate(300000L);
        if (publisher == null) {
            dataDist = new DataDistribution(getBufferSize(), PERCENTS);
            publisher = new DataPublisher(dataDist, getPublishIntervalMillis());
            开始数据采集
            publisher.start();
        }
        封装对应的服务信息
        this.server = server;
    }
}
public class MeasuredRate {
    private final AtomicLong _lastBucket = new AtomicLong(0);
    private final AtomicLong _currentBucket = new AtomicLong(0);
    private final long _sampleInterval;
    private volatile long _threshold;
    
    public MeasuredRate(long sampleInterval){
       _sampleInterval = sampleInterval;
       _threshold = System.currentTimeMillis() + sampleInterval;
    }
    
    public long getCount() {
        checkAndResetWindow();
        return _lastBucket.get();
    }
   
    public long getCurrentCount() {
        checkAndResetWindow();
        return _currentBucket.get();
    }
    
    public void increment() {
        checkAndResetWindow();
        _currentBucket.incrementAndGet();
    }

    private void checkAndResetWindow() {
        long now = System.currentTimeMillis();
        if(_threshold < now) {
            _lastBucket.set(_currentBucket.get());
            _currentBucket.set(0);
            _threshold = now + _sampleInterval;
        }
    }
    
    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("count:" + getCount());  
        sb.append("currentCount:" + getCurrentCount()); 
        return sb.toString();
    }
}
public class ServerStatsTest {
    public static void main(String args[]) {
        ServerStats ss = new ServerStats();
        //这里设置bufferSize的大小为1000的数组,记录1000个数据
        ss.setBufferSize(1000);
        //更新间隔为1秒
        ss.setPublishInterval(1000);
        //这里设置服务
        ss.initialize(new Server("stonse", 80));
        
        Random r = new Random(1459834);
        模拟100个请求,最后输出平均时长以及其他统计数据
        for (int i=0; i < 99; i++){
            double rl = r.nextDouble() * 25.2;
            ss.noteResponseTime(rl);
            ss.incrementNumRequests();
            try {
                Thread.sleep(100);
                System.out.println("ServerStats:avg:" + ss.getResponseTimeAvg());
                System.out.println("ServerStats:90 percentile:" + ss.getResponseTime90thPercentile());
                System.out.println("ServerStats:90 percentile:" + ss.getResponseTimePercentileNumValues());
            } catch (InterruptedException e) {
                
            }
           
        }
        System.out.println("done ---");
        ss.close();
        System.out.println("ServerStats:" + ss);
    }
}

最后输出结果如下

ServerStats:avg:13.275308970139484
ServerStats:90 percentile:24.29368204858261
ServerStats:90 percentile:10
ServerStats:[Server:stonse:80;  Zone:UNKNOWN;   Total Requests:99;  Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:13.275308970139484;   90 percentile resp time:24.29368204858261;  95 percentile resp time:24.29368204858261;  min resp time:0.021071074712381233; max resp time:24.60484318031518;    stddev resp time:7.121850590230442]

这个类其实就是简单的用于统计的。
可以算出平均响应时长,以及总活跃的请求数,等等相关指标,用于衡量服务的健康状态。

public class LoadBalancerStats implements IClientConfigAware {
    分区与统计信息的映射
    volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>();
    分区对应的服务列表
    volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();

    public LoadBalancerStats(){
        zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>();  
        upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();        
    }

    缓存-这是guaua提供的一个缓存,具有数据定时失效的功能
    private final LoadingCache<Server, ServerStats> serverStatsCache = 
        CacheBuilder.newBuilder()
            .expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES)
            .removalListener(new RemovalListener<Server, ServerStats>() {
                @Override
                public void onRemoval(RemovalNotification<Server, ServerStats> notification) {
                    notification.getValue().close();
                }
            })
            .build(
                new CacheLoader<Server, ServerStats>() {
                    public ServerStats load(Server server) {
                        return createServerStats(server);
                    }
                });
    
    创建服务的统计信息
    protected ServerStats createServerStats(Server server) {
        ServerStats ss = new ServerStats(this);
        ss.setBufferSize(1000);
        ss.setPublishInterval(1000);                    
        ss.initialize(server);
        return ss;        
    }

    新增某个服务的统计信息
    public void addServer(Server server) {
        try {
            serverStatsCache.get(server);
        } catch (ExecutionException e) {
            ServerStats stats = createServerStats(server);
            serverStatsCache.asMap().putIfAbsent(server, stats);
        }
    }
    
    获取某个服务的统计信息
    protected ServerStats getServerStats(Server server) {
        try {
            return serverStatsCache.get(server);
        } catch (ExecutionException e) {
            ServerStats stats = createServerStats(server);
            serverStatsCache.asMap().putIfAbsent(server, stats);
            return serverStatsCache.asMap().get(server);
        }
    } 
    
    某个服务对应的统计信息-增加总的活跃请求次数
    public void incrementActiveRequestsCount(Server server) {
        ServerStats ss = getServerStats(server); 
        ss.incrementActiveRequestsCount();
    }

    某个服务对应的统计信息-减少总的活跃请求次数
    public void decrementActiveRequestsCount(Server server) {
        ServerStats ss = getServerStats(server); 
        ss.decrementActiveRequestsCount();
    }
}
public class ZoneSnapshot {
    final int instanceCount;
    final double loadPerServer;
    final int circuitTrippedCount;
    final int activeRequestsCount;
    
    public ZoneSnapshot() {
        this(0, 0, 0, 0d);
    }
    
    public ZoneSnapshot(int instanceCount, int circuitTrippedCount, int activeRequestsCount, double loadPerServer) {
        this.instanceCount = instanceCount;
        this.loadPerServer = loadPerServer;
        this.circuitTrippedCount = circuitTrippedCount;
        this.activeRequestsCount = activeRequestsCount;
    }
    
    public final int getInstanceCount() {
        return instanceCount;
    }
    
    public final double getLoadPerServer() {
        return loadPerServer;
    }
    
    public final int getCircuitTrippedCount() {
        return circuitTrippedCount;
    }
    
    public final int getActiveRequestsCount() {
        return activeRequestsCount;
    }

    @Override
    public String toString() {
        return "ZoneSnapshot [instanceCount=" + instanceCount
                + ", loadPerServer=" + loadPerServer + ", circuitTrippedCount="
                + circuitTrippedCount + ", activeRequestsCount="
                + activeRequestsCount + "]";
    }
}

下面再来看看ZoneStats如何生成分区统计信息

public class ZoneStats<T extends Server> {
    private final LoadBalancerStats loadBalancerStats;统计信息(这个类中包含了所有服务的统计信息,以及分区对应服务列表)
    private final String zone;分区名字
    private static final String PREFIX = "ZoneStats_";前缀
    private final Counter counter;
    
    final String monitorId;
    
    public ZoneStats(String name, String zone, LoadBalancerStats loadBalancerStats) {
        this.zone = zone;
        this.loadBalancerStats = loadBalancerStats;
        monitorId = name + ":" + zone;  
        counter = Monitors.newCounter(PREFIX + name + "_" + zone + "_Counter");
        Monitors.registerObject(monitorId, this);
    }

    public final String getZone() {
        return zone;
    }
    
    有相关获取某个指标的方法,但是其实实现都是一样的。
    @Monitor(name=PREFIX + "ActiveRequestsCount", type = DataSourceType.INFORMATIONAL)    
    public int getActiveRequestsCount() {
        return loadBalancerStats.getActiveRequestsCount(zone);
    }

}

以获取分区的总活跃请求数为例子。
public class LoadBalancerStats implements IClientConfigAware {
    分区与服务列表的映射
    volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();

    public int getActiveRequestsCount(String zone) {
        return getZoneSnapshot(zone).getActiveRequestsCount();
    }

    获取分区的快照
    public ZoneSnapshot getZoneSnapshot(String zone) {
        if (zone == null) {
            return new ZoneSnapshot();
        }
        zone = zone.toLowerCase();
        获取分区对应的服务列表
        List<? extends Server> currentList = upServerListZoneMap.get(zone);
        return getZoneSnapshot(currentList);        
    }

    public ZoneSnapshot getZoneSnapshot(List<? extends Server> servers) {
        if (servers == null || servers.size() == 0) {
            return new ZoneSnapshot();
        }
        int instanceCount = servers.size();
        int activeConnectionsCount = 0;
        int activeConnectionsCountOnAvailableServer = 0;
        int circuitBreakerTrippedCount = 0;
        double loadPerServer = 0;
        long currentTime = System.currentTimeMillis();
        将每个服务的统计信息进行累加即可。
        for (Server server: servers) {
            ServerStats stat = getSingleServerStat(server);   
            if (stat.isCircuitBreakerTripped(currentTime)) {
                circuitBreakerTrippedCount++;
            } else {
                activeConnectionsCountOnAvailableServer += stat.getActiveRequestsCount(currentTime);
            }
            activeConnectionsCount += stat.getActiveRequestsCount(currentTime);
        }
        if (circuitBreakerTrippedCount == instanceCount) {
            if (instanceCount > 0) {
                // should be NaN, but may not be displayable on Epic
                loadPerServer = -1;
            }
        } else {
            loadPerServer = ((double) activeConnectionsCountOnAvailableServer) / (instanceCount - circuitBreakerTrippedCount);
        }
        生成快照
        return new ZoneSnapshot(instanceCount, circuitBreakerTrippedCount, activeConnectionsCount, loadPerServer);
    }
}
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
    @Override
    public Server chooseServer(Object key) {
        如果分区只有一个,那么简单的利用rule进行服务的筛选即可
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            获取所有服务的负载均衡信息
            LoadBalancerStats lbStats = getLoadBalancerStats();
            创建分区的统计信息
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            。。。省略部分代码
            通过rule来挑选出合适的分区
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            如果分区存在多个,则随机选取一个
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    获取该分区对应的负载均衡器
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    通过负载均衡器来选择服务,最后也是通过rule来选择服务
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读