Spring Cloud Ribbon 分析(三)之Ribbon
Spring Cloud Ribbon 分析(二)之LoadBalancerAutoConfiguration我们重点分析了RestTemplate对象设置拦截器来使自身具有负载的作用,本节我们着重分析Ribbon客户端相关的配置!
RibbonClientConfiguration
接口 | 默认实现类 | 描述 |
---|---|---|
IClientConfig | DefaultClientConfigImpl | 管理配置接口 |
IRule | ZoneAvoidanceRule | 均衡策略接口 |
IPing | DummyPing | 检查服务可用性接口 |
ServerList<Server> | ConfigurationBasedServerList | 获取服务列表接口 |
ILoadBalancer | ZoneAwareLoadBalancer | 负载均衡接口 |
ServerListUpdater | PollingServerListUpdater | 定时更新服务列表接口 |
ServerIntrospector | DefaultServerIntrospector | 安全端口接口 |
对于RibbonClientConfiguration配置类,比较重要的类就是表格里面对应的,下文会总结各自的用途
@RibbonClientName
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Value("${ribbon.client.name}")
public @interface RibbonClientName {
}
public SpringClientFactory() {
super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
}
@Value("") 这个属性在SpringClientFactory构造函数进行注入,生成最终的Ribbon客户端名称,具体实现可以查看Spring Cloud Ribbon 分析(二)之LoadBalancerAutoConfiguration
DefaultClientConfigImpl
/**
* Ribbon客户端相关的参数配置,比如ReadTimeout、ConnectTimeout等等默认参数
*/
public class DefaultClientConfigImpl implements IClientConfig {
......
@Override
public void loadProperties(String restClientName){
enableDynamicProperties = true;
setClientName(restClientName);
//设置默认属性值,比如ConnectTimeout设置为2秒,ReadTimeout设置为5秒
loadDefaultValues();
//获取当前服务配置的ribbon参数
Configuration props = ConfigurationManager.getConfigInstance().subset(restClientName);
for (Iterator<String> keys = props.getKeys(); keys.hasNext(); ){
String key = keys.next();
String prop = key;
try {
if (prop.startsWith(getNameSpace())){
prop = prop.substring(getNameSpace().length() + 1);
}
//设置属性
setPropertyInternal(prop, getStringValue(props, key));
} catch (Exception ex) {
throw new RuntimeException(String.format("Property %s is invalid", prop));
}
}
}
......
}
属性设置如果我们想只针对某个客户端进行配置,那么格式就是<clientName>.<nameSpace>.<propertyName>=<value>,比如我当前聚合业务A要调用领域服务B中的一个Feign接口,我们定义的FeignClient(value="clientName")为这样,那么我们这样配置就只会针对当前这个Feign客户端生效,如果想全局配置,那么我们就应该使用这样的格式<nameSpace>.<propertyName>=<value> 如:ribbon.ReadTimeout=1000
ZoneAwareLoadBalancer
/**
* 引入Zone区域概念,当选择服务器时可以避免整个区域的负载均衡器
*/
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
......
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
//设置Zone区域对应的实例列表
getLoadBalancer(zone).setServersList(entry.getValue());
}
//检查是否有任何区域不再有服务器,并将列表设置为空,以便与区域相关的度量不会包含过时数据
//检查Zone区域内的实例列表,如果没有实例了则清空对应的Zone区域列表,作用是Zone区域统计信息不包含过时数据,
//避免影响选择实例的算法
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
......
@Override
public Server chooseServer(Object key) {
//Zone区域数小于等于1,不进行区域选择,直接使用父类选择服务
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
//每个Zone区域都创建快照
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
//平均负载阈值,0.2默认值太小,建议改大
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
//Zone区域的服务实例挂掉的阈值,基本是全部挂掉就移除当前Zone
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
//可用区域
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
//可用区域数小于总的快照数,才参与随机选择一个区域,根据内部算法得知,可用数一定会小于总快照数,
//至于为啥,感兴趣的可用看看getAvailableZones内部实现
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
//随机选择一个Zone区域
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
//使用对应Zone区域的负载均衡选择服务
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
......
}
经过代码片段的分析,我们得知ZoneAwareLoadBalancer引入Zone区域的概念,比如我在华北区域有部署服务,我在华南区域也部署了相同的服务,那么默认情况我们在创建Server服务列表时候,默认Zone区域为UNKNOWN,这样就会把所有服务都划分到同一个UNKNOWN区域下,这样就会存在区域之间的延迟等性能问题,所以如果对区域有要求的服务,我们可以在创建Server服务列表时候显示指定所属的区域,这样就可以使用Ribbon的区域选择
ZoneAvoidanceRule
/**
* 根据Zone区域的可用性和Server的可用性选择服务器
*/
public class ZoneAvoidanceRule extends PredicateBasedRule {
......
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
//初始值为快照里面的区域均可用
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
//最差的区域
Set<String> worstZones = new HashSet<String>();
//所有Zone区域中负载最大的
double maxLoadPerServer = 0;
//是否是属于有限的区域可用标示,如果为false则代码全部区域可用
boolean limitedZoneAvailability = false;
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
//没有实例,移除当前区域,设置区域为有限
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
//当前Zone的平均负载
double loadPerServer = zoneSnapshot.getLoadPerServer();
//百分之99.999的机器挂掉则移除当前区域,设置有限区域标示
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
//若当前负载和最大负载之间小于0.000001d,那认为这个区域就是最差的,添加到最差的区域集合
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) { //当前负载大于最大负载
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
//最大负载小于设定的负载阈值并且全部区域可用,返回全部Zone
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
//在最差的区域集合中随机选择一个进行剔除,然后返回最终过滤过的可用Zone区域集合
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
......
}
在注释中,我们可用清晰的看见整个区域选择的过程,其中我们需要注意一点的是triggeringLoad这个阀值,这个值默认0.2相对来说太小了,具体可用根据自己的服务进行调整
PredicateBasedRule
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
@Override
public Server choose(Object key) {
//绝大多数情况下key为Null
//获取ZoneAwareLoadBalancer
ILoadBalancer lb = getLoadBalancer();
//获取断言器ZoneAvoidanceRule.compositePredicate,选择过滤之后的服务
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
......
//获取合格的服务列表
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
//调用过程为this.apply->CompositePredicate.apply->AbstractServerPredicate.ofKeyPredicate
//->AndPredicate.apply->ZoneAvoidancePredicate.apply & AvailabilityPredicate.apply
//最终会通过AndPredicate并且断言器,ZoneAvoidancePredicate和AvailabilityPredicate两个断言器的过滤,都满足才添加到结果中
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
//这个incrementAndGetModulo其实就是从小到大返回一个数
//比如eligible.size()等于5,那么返回值就类似0,1,2,3,4,0...
//其实就是轮训获取eligible里面的服务
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
}
上面我们看到ZoneAvoidanceRule继承PredicateBasedRule,最终的都会调用到choose(Object key)方法选择出最终的一个Server返回,那这个地方的key代表上面意思呢,这个简单介绍下,这个key可用理解为一个密钥,比如进行canary测试等等,我们需要将名为loadBalancerKey的信息传递给Ribbon,Ribbon可以在IRule中使用这些信息来选择要进行特殊处理的服务器,比如我们在Zuul网关中的RequestContext中有设置loadBalancerKey,RibbonRoutingFilter从RequestContext中找到loadBalancerKey,它将把它传递到RibbonCommandContext中。ContextAwareRequest将包含来自RibbonCommandContext的这个值,并最终传递给LoadBalancerCommand,这样我们在choose方法中就可用获取对应的密钥key进行不同的处理
PollingServerListUpdater
public class PollingServerListUpdater implements ServerListUpdater {
......
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
......
}
通过定时任务来触发更新服务列表,我们看见应用启动之后1秒开始执行,然后间隔时间30秒刷新一次服务列表,那是什么进行这个步骤呢,我们可以发现整个调用链比较简单,初始化ZoneAwareLoadBalancer->DynamicServerListLoadBalancer.restOfInit->DynamicServerListLoadBalancer.enableAndInitLearnNewServersFeature->PollingServerListUpdater.start
ServerList<Server>
public class ConfigurationBasedServerList extends AbstractServerList<Server> {
private IClientConfig clientConfig;
@Override
public List<Server> getInitialListOfServers() {
return getUpdatedListOfServers();
}
@Override
public List<Server> getUpdatedListOfServers() {
String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
return derive(listOfServers);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
protected List<Server> derive(String value) {
List<Server> list = Lists.newArrayList();
if (!Strings.isNullOrEmpty(value)) {
for (String s: value.split(",")) {
list.add(new Server(s.trim()));
}
}
return list;
}
}
对于默认的配置就相对简单,只需要在项目里面配置<clientName>.<nameSpace>.listOfServers=hostname:port,hostname1:port1 按照这样的格式配置对应客户端的服务地址即可,但是这样的方式很不适合当前k8s环境,因为k8s环境默认有自己的svc服务,所以我们在创建List<Server>服务列表时候,完全可以使用k8s的svc服务,比如: new Server(serviceId + "." + namespace + ".svc", port) 这样就可以不用把地址写死了
DummyPing
public class DummyPing extends AbstractLoadBalancerPing {
public DummyPing() {
}
public boolean isAlive(Server server) {
return true;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
默认实现永远返回服务可用,就是不进行健康检查,但是这样的做法不推荐,所以一般我们会使用Ribbon的PingUrl,对/health这个接口进行服务检查是否可用,具体开启Ping检查的调用链为ZoneAwareLoadBalancer->DynamicServerListLoadBalancer->BaseLoadBalancer.initWithConfig->setPing->setupPingTask->lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000) 每10秒Ping一次服务,具体实现逻辑感兴趣的同学可以根据调用链进行分析
本节针对Ribbon客户端的配置总结和分析就告一段落,下节将结合Feign客户端来分析Ribbon的负载,Spring Cloud Ribbon 分析(四)之Feign