(七)对服务列表存储以及更新(ILoadBalancer)

2021-11-21  本文已影响0人  guessguess

在前面已经知道,其实服务列表是可以通过各种方式加载,比如静态的---配置类,或者是写死的集合,动态的---通过注册中心发现服务,而实现这些功能的类是ServerList。至于控制调度的则是ServerListUpdater。这些服务列表,最后会被转化成Server进行存储。
那么存储到哪里?以及如何被管理
那么下面说到的类,ILoadBalancer就是用来管理Server的。

public interface ILoadBalancer {
    添加服务列表
    public void addServers(List<Server> newServers);
    通过key选择服务
    public Server chooseServer(Object key);
    将某个服务标记为宕机
    public void markServerDown(Server server);
    是否只选择存活的服务
    @Deprecated
    public List<Server> getServerList(boolean availableOnly);
    获取存活的服务列表
    public List<Server> getReachableServers();
    获取所有服务列表
    public List<Server> getAllServers();
}

从这个接口实现来看,其实是对Server的管理。涵盖了新增,查询,以及修改。至于为什么没有删除,因为ServerListUpdater会定时去更新整个服务列表。

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    }
    public Server chooseServer() {
        return chooseServer(null);
    }
    public abstract List<Server> getServerList(ServerGroup serverGroup);
    public abstract LoadBalancerStats getLoadBalancerStats();    
}
public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
    这里为什么用线程安全的类,以及可见性,因为本身ServerListUpdater的执行是通过线程池来操作。
    可见性-ServerListUpdater会对allServerList 进行重新赋值。
    synchronizedList-保证对allServerList 集合中的操作都是线程安全的。
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    
    记录存活的服务列表
    protected volatile List<Server> upServerList = Collections
            .synchronizedList(new ArrayList<Server>());

    关于allServerList 的读写锁。因为是对集合内部的元素属性做调整,所以需要加锁。
    本身线程安全的集合只是针对集合本身,与元素的修改无关,所以需要加锁。
    protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
    Ping用于检测服务是否存活
    protected IPing ping = null;

添加服务列表
-----------------------------------------------------------------------------------------------------------------------------
    @Override
    public void addServers(List<Server> newServers) {
        if (newServers != null && newServers.size() > 0) {
            try {
                ArrayList<Server> newList = new ArrayList<Server>();
                newList.addAll(allServerList);
                newList.addAll(newServers);
                setServersList(newList);
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
            }
        }
    }


    public void setServersList(List lsrv) {
        Lock writeLock = allServerLock.writeLock();        
        ArrayList<Server> newServers = new ArrayList<Server>();
        1.上锁
        writeLock.lock();
        try {
            2.将lsrv遍历,判断类型是否正确,最终转换为Server的集合
            ArrayList<Server> allServers = new ArrayList<Server>();
            for (Object server : lsrv) {
                if (server == null) {
                    continue;
                }
                if (server instanceof String) {
                    server = new Server((String) server);
                }
                if (server instanceof Server) {
                    allServers.add((Server) server);
                } else {
                    throw new IllegalArgumentException(
                            "Type String or Server expected, instead found:"
                                    + server.getClass());
                }
            }
            boolean listChanged = false;
            3.判断是否与原先的服务列表不同,从而决定是否进行修改
            list的equals方法除了比较长度,还会比较元素的内容
            if (!allServerList.equals(allServers)) {
                listChanged = true;
                4.在服务列表发生变化的情况下,通过监听器去执行相关操作。
                   后续再细讲。---
                if (changeListeners != null && changeListeners.size() > 0) {
                   List<Server> oldList = ImmutableList.copyOf(allServerList);
                   List<Server> newList = ImmutableList.copyOf(allServers);                   
                   for (ServerListChangeListener l: changeListeners) {
                       try {
                           l.serverListChanged(oldList, newList);
                       } catch (Exception e) {
                           打印日志。。。省略
                       }
                   }
                }
            }
            5.检测服务列表的是否准备好服务,其实就是访问服务的真实Ip地址。就看能不能ping的通。
              如果可以访问server.setReadyToServe(true);
            if (isEnablePrimingConnections()) {
                for (Server server : allServers) {
                    if (!allServerList.contains(server)) {
                        server.setReadyToServe(false);
                        newServers.add((Server) server);
                    }
                }
                if (primeConnections != null) {
                    primeConnections.primeConnectionsAsync(newServers, this);
                }
            }
            6. 替换原先的allServerList 
            allServerList = allServers;
            7.检测服务是否存活,如果跳过,默认都存活,否则通过Ping这个类,来检测服务是否存活。
               如果存活就会setAlive(true)
               通过更新upServerList,记录存活的服务列表
            if (canSkipPing()) {
                for (Server s : allServerList) {
                    s.setAlive(true);
                }
                upServerList = allServerList;
            } else if (listChanged) {
                forceQuickPing();
            }
        } finally {
            writeLock.unlock();
        }
    }
    
    private boolean canSkipPing() {
        if (ping == null
                || ping.getClass().getName().equals(DummyPing.class.getName())) {
            // default ping, no need to set up timer
            return true;
        } else {
            return false;
        }
    }
-------------------------------------------------------------------------------------------------------------------------
选择服务
    从代码来看,选择服务是通过Rule来进行选择的。同时在选择服务的时候,计数器会计数。
    protected IRule rule = DEFAULT_RULE;
    private final static IRule DEFAULT_RULE = new RoundRobinRule();
    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
-----------------------------------------------------------------------------------------------------------------------------
将服务标记为宕机

    private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();

    public void markServerDown(Server server) {
        if (server == null || !server.isAlive()) {
            return;
        }
        server.setAlive(false);
        通过监听器去做些什么~暂时没发现什么实现。
        notifyServerStatusChangeListener(singleton(server));
    }
}
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
    更新服务列表的标记
    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

    protected void updateAllServerList(List<T> ls) {
        由于在线程池中运行,避免资源浪费,以及资源竞争,用cas避免线程安全问题。
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); 
                }
                对服务列表进行管理---核心
                setServersList(ls);
               更新服务的存活状态
                super.forceQuickPing();
            } finally {
                修改表计,改为暂停
                serverListUpdateInProgress.set(false);
            }
        }
    }

}

上面可以看到,首先DynamicServerListLoadBalancer是动态的获取服务列表。其次,也支持对服务列表的过滤。那么在对于一些基础实现上是否存在不同。比如对于服务列表的管理。那么接下来继续看源码。

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    @Override
    public void setServersList(List lsrv) {
        这里是先调用了BaseLoadBalancer的setServersList方法。
        实现了服务列表基本的存储,以及服务状态更新。
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        用于存储将服务列表分区后的数据
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        遍历,对每个服务进行处理,设置分区信息
        for (Server server : serverList) {
            生成该服务的统计信息,利用了CacheLoader
            getLoadBalancerStats().getSingleServerStat(server);
            默认都是"defaultZone"
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        生成区域与服务列表的映射,存放在LoadBalancerStats中。
        setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(
            Map<String, List<Server>> zoneServersMap) {
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }
}

从上面这段代码来看,其实就是简单做了一个分区,将服务列表分区之后,进行存储。
但是存储的内容如何被利用?在ZoneAwareLoadBalancer中就可以知道了。

public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
    用于存储分区与负载均衡器的映射
    private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
---------------------------------------------------------------------------------------------------------------------------------
对服务列表的处理
        1.服务列表分区后,存储到LoadBalancerStats
        2.给分区分配负载均衡器,同时给负载均衡器分配规则

    覆写父类DynamicServerListLoadBalancer的setServerListForZones方法。
    @Override
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        调回DynamicServerListLoadBalancer的setServerListForZones方法
        目的就是将zoneServersMap存储到LoadBalancerStats中,对分区信息进行存储。
        super.setServerListForZones(zoneServersMap);
        一开始balancers 只会为null。
        if (balancers == null) {
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        给对应的分区分配负载均衡器
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            分配负载均衡器,同时对服务列表进行存储
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        如果新的分区列表中,不包含原先的分区,则将该分区对应的服务列表清空
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }

    BaseLoadBalancer getLoadBalancer(String zone) {
        zone = zone.toLowerCase();
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        如果该区域没有负载均衡器,则重新分配,类型为BaseLoadBalancer
        if (loadBalancer == null) {
            分配规则---这个规则其实就是筛选服务的规则
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
                loadBalancer = prev;
            }
        } 
        return loadBalancer;        
    }
-------------------------------------------------------------------------------------------------------------------------------
那么如何筛选?
       1.如果存活的分区小1的情况下,会直接使用规则筛选出服务。
       2.利用统计数据,筛选出合适的区域,最后在该区域的服务列表进行筛选。
    @Override
    public Server chooseServer(Object key) {
        分区为小于1的情况下,直接用规则进行筛选。
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    选出合适的区域,通过该区域对于的负载均衡器去选择服务。
                    该区域的服务列表,存储在该负载均衡器
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

}

流程图如下


服务列表的存储流程
上一篇下一篇

猜你喜欢

热点阅读