(十)Predicate

2021-12-08  本文已影响0人  guessguess

最后再来说一下Predicate这个类。
负载均衡使用这个类用来过滤服务。

public interface Predicate<T> {
  boolean apply(@Nullable T input);
  @Override
  boolean equals(@Nullable Object object);
}

从接口的结构上来看比较简单,只有一个apply返回boolean类型的结果,以及一个equals方法。
下面看看相关实现类


相关实现类
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        if (loadBalancerKey == null) {
            return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
        } else {
            对服务列表进行过滤,通过子类的apply方法
            List<Server> results = Lists.newArrayList();
            for (Server server: servers) {
                if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                    results.add(server);
                }
            }
            return results;            
        }
    }
}
public class CompositePredicate extends AbstractServerPredicate {
    //delegate  优先的过滤规则
    private AbstractServerPredicate delegate;
    //服务列表不满足要求的时候,用次要的过滤规则重新匹配
    private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
        
    private int minimalFilteredServers = 1;
    
    private float minimalFilteredPercentage = 0;    
    
    @Override
    public boolean apply(@Nullable PredicateKey input) {
        使用优先的过滤规则进行过滤
        return delegate.apply(input);
    }

    
    public static class Builder {
        private CompositePredicate toBuild;
        
        Builder(AbstractServerPredicate primaryPredicate) {
            toBuild = new CompositePredicate();    
            toBuild.delegate = primaryPredicate;                    
        }

        Builder(AbstractServerPredicate ...primaryPredicates) {
            toBuild = new CompositePredicate();
            Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);
            toBuild.delegate =  AbstractServerPredicate.ofKeyPredicate(chain);                
        }

        public Builder addFallbackPredicate(AbstractServerPredicate fallback) {
            toBuild.fallbacks.add(fallback);
            return this;
        }
                
        public Builder setFallbackThresholdAsMinimalFilteredNumberOfServers(int number) {
            toBuild.minimalFilteredServers = number;
            return this;
        }
        
        public Builder setFallbackThresholdAsMinimalFilteredPercentage(float percent) {
            toBuild.minimalFilteredPercentage = percent;
            return this;
        }
        
        public CompositePredicate build() {
            return toBuild;
        }
    }
    
    public static Builder withPredicates(AbstractServerPredicate ...primaryPredicates) {
        return new Builder(primaryPredicates);
    }

    public static Builder withPredicate(AbstractServerPredicate primaryPredicate) {
        return new Builder(primaryPredicate);
    }

    @Override
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
        Iterator<AbstractServerPredicate> i = fallbacks.iterator();
        服务列表未满足要求的时候,按次要的过滤规则进行重新过滤
        while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
                && i.hasNext()) {
            AbstractServerPredicate predicate = i.next();
            result = predicate.getEligibleServers(servers, loadBalancerKey);
        }
        return result;
    }
}

public class ZoneAvoidancePredicate extends  AbstractServerPredicate {
    @Override
    public boolean apply(@Nullable PredicateKey input) {
        if (!ENABLED.get()) {
            return true;
        }
        String serverZone = input.getServer().getZone();
        没有分区直接返回
        if (serverZone == null) {
            return true;
        }
        LoadBalancerStats lbStats = getLBStats();
        if (lbStats == null) {
            没有统计信息直接返回
            return true;
        }
        if (lbStats.getAvailableZones().size() <= 1) {
            可靠的分区数<=1,直接返回
            return true;
        }
        通过ZoneAvoidanceRule获取分区的统计快照
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        if (!zoneSnapshot.keySet().contains(serverZone)) {
            return true;
        }
        拿到可靠的分区集合,通过分区的统计数据来分析
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        如果可靠的分区中包括该分区则返回true,否则返回false
        if (availableZones != null) {
            return availableZones.contains(input.getServer().getZone());
        } else {
            return false;
        }
    }    

}
public class AvailabilityPredicate extends  AbstractServerPredicate {
    @Override
    public boolean apply(@Nullable PredicateKey input) {
        LoadBalancerStats stats = getLBStats();
        if (stats == null) {
            return true;
        }
        return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
    }
    
    
    private boolean shouldSkipServer(ServerStats stats) {
        如果服务熔断,或者活跃请求数超过限制了,则跳过        
        if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
            return true;
        }
        return false;
    }
}
上一篇 下一篇

猜你喜欢

热点阅读