Spring Cloud

Spring Cloud Ribbon 分析(三)之Ribbon

2021-01-14  本文已影响0人  Blog

Spring Cloud Ribbon 分析(二)之LoadBalancerAutoConfiguration我们重点分析了RestTemplate对象设置拦截器来使自身具有负载的作用,本节我们着重分析Ribbon客户端相关的配置!


RibbonClientConfiguration

接口 默认实现类 描述
IClientConfig DefaultClientConfigImpl 管理配置接口
IRule ZoneAvoidanceRule 均衡策略接口
IPing DummyPing 检查服务可用性接口
ServerList<Server> ConfigurationBasedServerList 获取服务列表接口
ILoadBalancer ZoneAwareLoadBalancer 负载均衡接口
ServerListUpdater PollingServerListUpdater 定时更新服务列表接口
ServerIntrospector DefaultServerIntrospector 安全端口接口

对于RibbonClientConfiguration配置类,比较重要的类就是表格里面对应的,下文会总结各自的用途


@RibbonClientName

@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
        ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Value("${ribbon.client.name}")
public @interface RibbonClientName {

}

public SpringClientFactory() {
    super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
}

@Value("\color{#000000}{ribbon.client.name}") 这个属性在SpringClientFactory构造函数进行注入,生成最终的Ribbon客户端名称,具体实现可以查看Spring Cloud Ribbon 分析(二)之LoadBalancerAutoConfiguration


DefaultClientConfigImpl

/**
* Ribbon客户端相关的参数配置,比如ReadTimeout、ConnectTimeout等等默认参数
*/
public class DefaultClientConfigImpl implements IClientConfig {
    ......
    @Override
    public void loadProperties(String restClientName){
        enableDynamicProperties = true;
        setClientName(restClientName);
        //设置默认属性值,比如ConnectTimeout设置为2秒,ReadTimeout设置为5秒
        loadDefaultValues();
        //获取当前服务配置的ribbon参数
        Configuration props = ConfigurationManager.getConfigInstance().subset(restClientName);
        for (Iterator<String> keys = props.getKeys(); keys.hasNext(); ){
            String key = keys.next();
            String prop = key;
            try {
                if (prop.startsWith(getNameSpace())){
                    prop = prop.substring(getNameSpace().length() + 1);
                }
                //设置属性
                setPropertyInternal(prop, getStringValue(props, key));
            } catch (Exception ex) {
                throw new RuntimeException(String.format("Property %s is invalid", prop));
            }
        }
    }
    ......
}

属性设置如果我们想只针对某个客户端进行配置,那么格式就是<clientName>.<nameSpace>.<propertyName>=<value>,比如我当前聚合业务A要调用领域服务B中的一个Feign接口,我们定义的FeignClient(value="clientName")为这样,那么我们这样配置就只会针对当前这个Feign客户端生效,如果想全局配置,那么我们就应该使用这样的格式<nameSpace>.<propertyName>=<value> 如:ribbon.ReadTimeout=1000


ZoneAwareLoadBalancer

/**
* 引入Zone区域概念,当选择服务器时可以避免整个区域的负载均衡器
*/
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
    ......
    @Override
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            //设置Zone区域对应的实例列表
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        //检查是否有任何区域不再有服务器,并将列表设置为空,以便与区域相关的度量不会包含过时数据
        //检查Zone区域内的实例列表,如果没有实例了则清空对应的Zone区域列表,作用是Zone区域统计信息不包含过时数据,
        //避免影响选择实例的算法
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }    

    ......

    @Override
    public Server chooseServer(Object key) {
        //Zone区域数小于等于1,不进行区域选择,直接使用父类选择服务
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            //每个Zone区域都创建快照
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            //平均负载阈值,0.2默认值太小,建议改大
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }
            //Zone区域的服务实例挂掉的阈值,基本是全部挂掉就移除当前Zone
            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            //可用区域
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            //可用区域数小于总的快照数,才参与随机选择一个区域,根据内部算法得知,可用数一定会小于总快照数,
            //至于为啥,感兴趣的可用看看getAvailableZones内部实现
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                //随机选择一个Zone区域
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    //使用对应Zone区域的负载均衡选择服务
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }
    ......
}

经过代码片段的分析,我们得知ZoneAwareLoadBalancer引入Zone区域的概念,比如我在华北区域有部署服务,我在华南区域也部署了相同的服务,那么默认情况我们在创建Server服务列表时候,默认Zone区域为UNKNOWN,这样就会把所有服务都划分到同一个UNKNOWN区域下,这样就会存在区域之间的延迟等性能问题,所以如果对区域有要求的服务,我们可以在创建Server服务列表时候显示指定所属的区域,这样就可以使用Ribbon的区域选择


ZoneAvoidanceRule

/**
* 根据Zone区域的可用性和Server的可用性选择服务器
*/
public class ZoneAvoidanceRule extends PredicateBasedRule {

    ......

    public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
        if (snapshot.isEmpty()) {
            return null;
        }
        //初始值为快照里面的区域均可用
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
            return availableZones;
        }
        //最差的区域
        Set<String> worstZones = new HashSet<String>();
        //所有Zone区域中负载最大的
        double maxLoadPerServer = 0;
        //是否是属于有限的区域可用标示,如果为false则代码全部区域可用
        boolean limitedZoneAvailability = false;

        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            //没有实例,移除当前区域,设置区域为有限
            if (instanceCount == 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                //当前Zone的平均负载
                double loadPerServer = zoneSnapshot.getLoadPerServer();
                //百分之99.999的机器挂掉则移除当前区域,设置有限区域标示
                if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
                    //若当前负载和最大负载之间小于0.000001d,那认为这个区域就是最差的,添加到最差的区域集合
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) { //当前负载大于最大负载
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }
        //最大负载小于设定的负载阈值并且全部区域可用,返回全部Zone
        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
            // zone override is not needed here
            return availableZones;
        }
        //在最差的区域集合中随机选择一个进行剔除,然后返回最终过滤过的可用Zone区域集合
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        if (zoneToAvoid != null) {
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;
    }
    ......
}

在注释中,我们可用清晰的看见整个区域选择的过程,其中我们需要注意一点的是triggeringLoad这个阀值,这个值默认0.2相对来说太小了,具体可用根据自己的服务进行调整


PredicateBasedRule

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
    @Override
    public Server choose(Object key) {
        //绝大多数情况下key为Null
        //获取ZoneAwareLoadBalancer
        ILoadBalancer lb = getLoadBalancer();
        //获取断言器ZoneAvoidanceRule.compositePredicate,选择过滤之后的服务
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
}
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {

    ......
    //获取合格的服务列表
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        if (loadBalancerKey == null) {
            return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
        } else {
            List<Server> results = Lists.newArrayList();
            for (Server server: servers) {
                //调用过程为this.apply->CompositePredicate.apply->AbstractServerPredicate.ofKeyPredicate
                //->AndPredicate.apply->ZoneAvoidancePredicate.apply & AvailabilityPredicate.apply
                //最终会通过AndPredicate并且断言器,ZoneAvoidancePredicate和AvailabilityPredicate两个断言器的过滤,都满足才添加到结果中
                if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                    results.add(server);
                }
            }
            return results;            
        }
    }
    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        //这个incrementAndGetModulo其实就是从小到大返回一个数
        //比如eligible.size()等于5,那么返回值就类似0,1,2,3,4,0...
        //其实就是轮训获取eligible里面的服务
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
}

上面我们看到ZoneAvoidanceRule继承PredicateBasedRule,最终的都会调用到choose(Object key)方法选择出最终的一个Server返回,那这个地方的key代表上面意思呢,这个简单介绍下,这个key可用理解为一个密钥,比如进行canary测试等等,我们需要将名为loadBalancerKey的信息传递给Ribbon,Ribbon可以在IRule中使用这些信息来选择要进行特殊处理的服务器,比如我们在Zuul网关中的RequestContext中有设置loadBalancerKey,RibbonRoutingFilter从RequestContext中找到loadBalancerKey,它将把它传递到RibbonCommandContext中。ContextAwareRequest将包含来自RibbonCommandContext的这个值,并最终传递给LoadBalancerCommand,这样我们在choose方法中就可用获取对应的密钥key进行不同的处理


PollingServerListUpdater

public class PollingServerListUpdater implements ServerListUpdater {
    ......
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
    ......
}

通过定时任务来触发更新服务列表,我们看见应用启动之后1秒开始执行,然后间隔时间30秒刷新一次服务列表,那是什么进行这个步骤呢,我们可以发现整个调用链比较简单,初始化ZoneAwareLoadBalancer->DynamicServerListLoadBalancer.restOfInit->DynamicServerListLoadBalancer.enableAndInitLearnNewServersFeature->PollingServerListUpdater.start


ServerList<Server>

public class ConfigurationBasedServerList extends AbstractServerList<Server>  {
private IClientConfig clientConfig;
        
    @Override
    public List<Server> getInitialListOfServers() {
        return getUpdatedListOfServers();
    }

    @Override
    public List<Server> getUpdatedListOfServers() {
        String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
        return derive(listOfServers);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }
    
    protected List<Server> derive(String value) {
        List<Server> list = Lists.newArrayList();
        if (!Strings.isNullOrEmpty(value)) {
            for (String s: value.split(",")) {
                list.add(new Server(s.trim()));
            }
        }
        return list;
    }
}

对于默认的配置就相对简单,只需要在项目里面配置<clientName>.<nameSpace>.listOfServers=hostname:port,hostname1:port1 按照这样的格式配置对应客户端的服务地址即可,但是这样的方式很不适合当前k8s环境,因为k8s环境默认有自己的svc服务,所以我们在创建List<Server>服务列表时候,完全可以使用k8s的svc服务,比如: new Server(serviceId + "." + namespace + ".svc", port) 这样就可以不用把地址写死了


DummyPing

public class DummyPing extends AbstractLoadBalancerPing {

    public DummyPing() {
    }

    public boolean isAlive(Server server) {
        return true;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

默认实现永远返回服务可用,就是不进行健康检查,但是这样的做法不推荐,所以一般我们会使用Ribbon的PingUrl,对/health这个接口进行服务检查是否可用,具体开启Ping检查的调用链为ZoneAwareLoadBalancer->DynamicServerListLoadBalancer->BaseLoadBalancer.initWithConfig->setPing->setupPingTask->lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000) 每10秒Ping一次服务,具体实现逻辑感兴趣的同学可以根据调用链进行分析


本节针对Ribbon客户端的配置总结和分析就告一段落,下节将结合Feign客户端来分析Ribbon的负载,Spring Cloud Ribbon 分析(四)之Feign

上一篇 下一篇

猜你喜欢

热点阅读