Spring cloud

SpringCloud 之 Zuul 源代码详细笔记

2018-04-27  本文已影响29人  alexqdjay

1. Zuul 介绍

ZuulSpring Cloud 微服务体系中担任很重要的角色--服务网关,是基于JVM的路由器和负载均衡器。

Zuul 的基本使用以及 Filter 的介绍就不在这说明了,本文主要介绍 Zuul 的原理。

2. Zuul 处理流程

处理流程如下:


Request => ZuulHandlerMapping => ZuulController => ZuulServlet

主要的接收逻辑都在 ZuulServlet 中,执行 Filter 的逻辑,根据 Filter 的类型依次执行,如下代码:

try {
    preRoute();
} catch (ZuulException e) {
    error(e);
    postRoute();
    return;
}
try {
    route();
} catch (ZuulException e) {
    error(e);
    postRoute();
    return;
}
try {
    postRoute();
} catch (ZuulException e) {
    error(e);
    return;
}

接收的代码已经清楚了,其实 Zuul 组件的功能就到这边了,剩下对请求进行路由其实主要使用了Ribbon 组件进行的,因此下面与其说是介绍 Zuul 到不如说是 Ribbon 的介绍。

路由的逻辑处理主要是 route()Route Filter 进行的。

3. Route Filter

Zuul 中 Route FilterSimpleHostRoutingFilterRibbonRoutingFilter, 有人说还有 SendForwardFilter(本地的先不关注)。

3.1 SimpleHostRoutingFilter

当你配置路由时,直接配置 Url 而不是 serviceId ,那么就是使用的 SimpleHostRoutingFilter,相反就是用的 RibbonRoutingFilter

主要逻辑:

public Object run() {
    // 省略没用逻辑 ...
    
    String uri = this.helper.buildZuulRequestURI(request);
    this.helper.addIgnoredHeaders();

    try {
       // forward 主要逻辑
        CloseableHttpResponse response = forward(this.httpClient, verb, uri, request, headers, params, requestEntity);
        setResponse(response);
    }
    catch (Exception ex) {
        throw new ZuulRuntimeException(ex);
    }
}

// 从返回就能看出来调用 httpClient 完成的http请求
private CloseableHttpResponse forward () {
// ...
    // forwardRequest
    CloseableHttpResponse zuulResponse = forwardRequest(httpclient, httpHost, httpRequest);
    return zuulResponse;

// ...
}

// 通过 httpClient 发请求
private CloseableHttpResponse forwardRequest(CloseableHttpClient httpclient,
            HttpHost httpHost, HttpRequest httpRequest) throws IOException {
    return httpclient.execute(httpHost, httpRequest);
}

总结:构建 Request 然后通过 httpClient 进行请求。

3.2 RibbonRoutingFilter

public Object run() {
    // ...
    // 构建请求上下文,其实就是保护一下参数,如serviceId, retryable, url, 原request等
    RibbonCommandContext commandContext = buildCommandContext(context);
    // forward
    ClientHttpResponse response = forward(commandContext);
    setResponse(response);
    return response;
}

protected ClientHttpResponse forward(RibbonCommandContext context) {
    // ...
    // 这里的重点转到 command 上了,主要逻辑都是 command 中执行
    RibbonCommand command = this.ribbonCommandFactory.create(context);
    ClientHttpResponse response = command.execute();
    return response;
}

4. Command

4.1 继承关系

HystrixCommand
RibbonCommand
    <- AbstractRibbonCommand 
        <- HttpClientRibbonCommand / RestClientRibbonCommand / OkHttpRibbonCommand

主要的逻辑都是 AbstractRibbonCommand 中,子类是不同选型 HttpClient, OkHttpHttpURLConnection

4.2 AbstractRibbonCommand

由于继承的 HystrixCommand 所以需要实现 run() 方法,上面调用execute() 是来自 HystrixCommand 主体逻辑需要 run() 中实现。

protected ClientHttpResponse run() throws Exception {
    final RequestContext context = RequestContext.getCurrentContext();

    // 根据不同实现创建不同的Request
    RQ request = createRequest();
    // 执行负载均衡逻辑,其中 client 是 ribbonCommandFactory.create 中置入的
    RS response = this.client.executeWithLoadBalancer(request, config);

    context.set("ribbonResponse", response);

    if (this.isResponseTimedOut()) {
        if (response != null) {
            response.close();
        }
    }

    return new RibbonHttpResponse(response);
}

4.3 RibbonCommandFactory

CommandFactory 观察下 client 是什么类,做什么事情。

ribbonCommandFactory 的继承关系:

RibbonCommandFactory   
    <- AbstractRibbonCommandFactory   
        <- HttpClientRibbonCommandFactory
           OkHttpRibbonCommandFactory
           RestClientRibbonCommandFactory

以默认的 HttpClientRibbonCommandFactory 为例,代码如下:

public HttpClientRibbonCommand create(final RibbonCommandContext context) {
    // 获取降级处理
    ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
    // serviceId 是根据请求的url来比对配置的路由得到的
    final String serviceId = context.getServiceId();
    // clientFactory 根据 ServiceId 获取相关的组件,包括 IRule, IClientConfig, ILoadBalancer 等组件
    final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
            serviceId, RibbonLoadBalancingHttpClient.class);
    client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
    // 不同类型的 Factory 会获取不同的 client,生成不同的 Command
    return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
            clientFactory.getClientConfig(serviceId));
}

由此可见 client 就是 RibbonLoadBalancingHttpClient ,当然其他实现也会对应不一样的 client

4.4 Client

client 的继承比较复杂,从主要的继承看:

LoadBalancerContext 
    <- AbstractLoadBalancerAwareClient 
        <- AbstractLoadBalancingClient 
            <- RibbonLoadBalancingHttpClient
               OkHttpLoadBalancingClient

4.2 中 this.client.executeWithLoadBalancer 执行的是:

// AbstractLoadBalancerAwareClient.executeWithLoadBalancer()
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    // 重试策略处理类,要判断是不是重试可以重写这个
    RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
    // 专门用于失败切换其他服务端进行重试的 Command
    LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
            .withLoadBalancerContext(this)
            .withRetryHandler(handler)
            .withLoadBalancerURI(request.getUri())
            .build();

    try {
        // 见下面分析
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        // 真实动作,执行 execute
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }
}

这里有两个地方需要注重看的:

  1. LoadBalancerCommand 实现
  2. execute() 逻辑

先看 execute()LoadBalancerCommand 单独介绍

4.4.1 execute()

以默认的 RibbonLoadBalancingHttpClient 为例。(RibbonLoadBalancingHttpClient 将会被改名为ApacheHttpLoadBalancingClient,因为它的兄弟叫OkHttpLoadBalancingClient这样看上去比较像比较对称)。

public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,final IClientConfig configOverride) {
    // ...
    
    final HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
    // delegate == httpClient, 是通过 createDelegate 创建的,那么 okHttp 的相应的地方就是 okHttp 对应的 client
    // 所以,这里就是发送普通的 http 请求
    final HttpResponse httpResponse = this.delegate.execute(httpUriRequest);
    return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}

4.5 LoadBalancerCommand

主要逻辑都是在 submit 中, 使用了 rxJava 的特性进行重试, 下面删除了很多细节代码,剩下主干重试逻辑。

public Observable<T> submit(final ServerOperation<T> operation) {
    // ...
    
    // 外层的 observable 为了不同目标的重试
    // selectServer() 是进行负载均衡,返回的是一个 observable,可以重试,重试时再重新挑选一个目标server
    Observable<T> o = selectServer().concatMap(server -> {
        // 这里又开启一个 observable 主要是为了同机重试
        Observable<T> o = Observable
          .just(server)
          .concatMap(server -> {
              return operation.call(server).doOnEach(new Observer<T>() {
                 @Override
                 public void onCompleted() {
                    // server 状态的统计,譬如消除联系异常,抵消activeRequest等
                 }
                 
                 @Override
                 public void onError() {
                    // server 状态的统计,错误统计等
                 }
                 
                 @Override
                 public void onNext() {
                    // 获取 entity, 返回内容
                 }
              });
        })
        // 如果设置了同机重试,进行重试
        if (maxRetrysSame > 0) 
            // retryPolicy 判断是否重试,具体分析看下面
            o = o.retry(retryPolicy(maxRetrysSame, true));
        return o;
    })
    
    // 设置了异机重试,进行重试
    if (maxRetrysNext > 0) 
        o = o.retry(retryPolicy(maxRetrysNext, false));
    
    return o.onErrorResumeNext(exp -> {
        return Observable.error(e);
    });
}

主要的重试逻辑如上,但是细节需要分析的:

  1. retryPolicy()
  2. selectServer()
  3. 目标 Server 状态记录

4.5.1 retryPolicy

private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
    return new Func2<Integer, Throwable, Boolean>() {
        @Override
        public Boolean call(Integer tryCount, Throwable e) {
            if (e instanceof AbortExecutionException) {
                return false;
            }
              // 重构总次数还是会放弃重试的
            if (tryCount > maxRetrys) {
                return false;
            }
            
            if (e.getCause() != null && e instanceof RuntimeException) {
                e = e.getCause();
            }
            
            // 判断 exception 是否进行重试,可以自定义 handler 进行定制化
            return retryHandler.isRetriableException(e, same);
        }
    };
}

4.5.2 selectServer

private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                // 通过 loadBalancerContext.getServerFromLoadBalancer 来进行负载均衡
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

loadBalancerContext.getServerFromLoadBalancer 进行负载均衡选择下一个请求目标,整个方法比较大,不列出了,把调用关系列出后分析主要的逻辑类。

loadBalancerContext.getServerFromLoadBalancer () 
    > lb.chooseServer() 

实际在作用的是 ILoadBalancer.chooseServer 方法。

4.5.3 ILoadBalancer

ILoadBalancer 继承关系:

ILoadBalancer
    <- AbstractLoadBalancer
        <- BaseLoadBalancer
            <- DynamicServerListLoadBalancer
                <- ZoneAwareLoadBalancer

ILoadBalancer 接口:

public interface ILoadBalancer {
    void addServers(List<Server> newServers);
    Server chooseServer(Object key); // 主要逻辑
    void markServerDown(Server server);
    @Deprecated
    List<Server> getServerList(boolean availableOnly);
    List<Server> getReachableServers();
    List<Server> getAllServers();
}

实现负载均衡的逻辑的类 BaseLoadBalancer, DynamicServerListLoadBalancer 加入动态 ServerList 的功能,负载均衡逻辑并没有补充。

BaseLoadBalancer.chooseServer 主要逻辑代码:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
              // Rule 执行挑选逻辑
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

4.6 IRule

IRule
<- AbstractLoadBalancerRule
    <- ClientConfigEnabledRoundRobinRule // abstract
        <- BestAvailableRule // 最小连接优先轮询
           PredicateBasedRule // abstract
                <- AvailabilityFilteringRule 
                <- ZoneAvoidanceRule
    <- RoundRobinRule
        <- WeightedResponseTimeRule
    <- RandomRule
    <- RetryRule

4.6.1 PredicateBasedRule

基于逻辑断言进行判断是否选择的 Rule, 具体 Predicate 继承如下:

Predicate
    <- AbstractServerPredicate
        <- AvailabilityPredicate // 可用性判断
           ZoneAvoidancePredicate // 区域选择
           CompositePredicate    // 复合判断自身没有逻辑,组合其他 Predicate
          

AvailabilityPredicate

public boolean apply(@Nullable PredicateKey input) {
    LoadBalancerStats stats = getLBStats();
    if (stats == null) {
        return true;
    }
    // 判断是否跳过
    return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
    
    
private boolean shouldSkipServer(ServerStats stats) {
     // 如果处于不可用 或者 当前请求大于最大限制 时跳过该目标        
    if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
            || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
        return true;
    }
    return false;
}

ZoneAvoidancePredicate

public boolean apply(@Nullable PredicateKey input) {
    // ...
    // 选择出可用区域,具体逻辑在 ZoneAvoidanceRule 中解析
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
   if (availableZones != null) {
        return availableZones.contains(input.getServer().getZone());
    } else {
        return false;
    }
}

CompositePredicate 组合逻辑断言

CompositePredicate

// 使用多个 Predicate 组成判断的 And 逻辑链
// 类似 if xx && yy & oo 
Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);

// 获取可用列表时使用到回退逻辑
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
    Iterator<AbstractServerPredicate> i = fallbacks.iterator();
    // 当筛选下来的server个数不符合配置中的最小个数时,会进行回退重选,一直回退到符合要求或者没有回退逻辑
    while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
            && i.hasNext()) {
        AbstractServerPredicate predicate = i.next();
        result = predicate.getEligibleServers(servers, loadBalancerKey);
    }
    return result;
}

// AbstractServerPredicate 上面 super.getEligibleServers 
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    if (loadBalancerKey == null) {
        return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
    } else {
        List<Server> results = Lists.newArrayList();
        for (Server server: servers) {
            // 每个 server 经过逻辑断言进行判断进行筛选
            if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                results.add(server);
            }
        }
        return results;            
    }
}

三大 Predicate 已经介绍完毕,回到主题。

PredicateBasedRule 主要逻辑:

public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
    // 基于逻辑断言进行轮询 Predicate 由子类决定
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}
// AbstractServerPredicate
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
     // 过滤可用结果, getEligibleServers 上面已经解析
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    // 标准轮询
    return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size()));
}

4.6.2 AvailabilityFilteringRule

AvailabilityFilteringRule 目标可用性轮询

public Server choose(Object key) {
    int count = 0;
    Server server = roundRobinRule.choose(key);
    while (count++ <= 10) {
        // 逻辑判断
        if (predicate.apply(new PredicateKey(server))) {
            return server;
        }
        // 轮询
        server = roundRobinRule.choose(key);
    }
    return super.choose(key);
}

// 其中 predicate
// CompositePredicate 组合逻辑,这里只有 AvailabilityPredicate 可用性判断
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
                .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                .build();

4.6.3 ZoneAvoidanceRule

ZoneAvoidanceRule 没有重写 choose 方法,所以还是继承了 PredicateBasedRule,所以过滤逻辑其实就是 compositePredicate. getEligibleServers,而经过上面的解析,getEligibleServers 其实就是所有 server 进行逻辑判断,把通过的返回。

// Predicate 组合了 zonePredicate 和 availabilityPredicate
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);

可见主要是 zonePredicateavailabilityPredicate 的逻辑判断。

zonePredicate 上面分析主要调用 ZoneAvoidanceRule.getAvailableZones

// getAvailableZones 主要逻辑
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
    String zone = zoneEntry.getKey();
    ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
    int instanceCount = zoneSnapshot.getInstanceCount();
    // 没有实例 即排除
    if (instanceCount == 0) {
        availableZones.remove(zone);
        limitedZoneAvailability = true;
    } else {
        double loadPerServer = zoneSnapshot.getLoadPerServer();
        // 不可用率超过阀值 或者 区域本来就不可用,即排除
        if (((double) zoneSnapshot.getCircuitTrippedCount())
                / instanceCount >= triggeringBlackoutPercentage
                || loadPerServer < 0) {
            availableZones.remove(zone);
            limitedZoneAvailability = true;
        } else {
            // 过滤出 负载最高的几个区域 
            if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                // they are the same considering double calculation
                // round error
                worstZones.add(zone);
            } else if (loadPerServer > maxLoadPerServer) {
                maxLoadPerServer = loadPerServer;
                worstZones.clear();
                worstZones.add(zone);
            }
        }
    }
}

// 没有排除 并且 最高负载没有超过限制,返回
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
    // zone override is not needed here
    return availableZones;
}
// 否则 随机排除一个负载高的区域
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
    availableZones.remove(zoneToAvoid);
}
return availableZones;

这里有个问题:为啥当存在排除时即便没有超过限制负载也要排除一个区域?

4.6.4 RoundRobinRule

比较简单,如下:

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }

          // 累加取模,标准轮询
        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);
        
        // 非线程安全list,可能会导致size有了对应索引处元素没有同步过来
        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        // 可用即返回,不然下一轮
        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    // 超过10次没有获取到可用的server
    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                + lb);
    }
    return server;
}

4.6.5 WeightedResponseTimeRule

// 这里会启动一个维持 使用响应时间计算比重系数 的任务 DynamicServerWeightTask
// 主要公式
// totalResponseTime 为所有server 平均响应时间的和,由下公式知,响应越快 weight 越大
// weight = totalResponseTime - ss.getResponseTimeAvg();
// weightSoFar += weight;
// finalWeights.add(weightSoFar); 
// 0 - maxTotalWeight 的概率假设是平均的,那么 weight 越大区间就越大被选中的概率就越大
// 如 Aw(10) Bw(30) Cw(40) Dw(20)
// weightSoFar: 10, 40, 80, 100
// 那么 0-10, 10-40, 40-80, 80-100 可以加 40-80区间最大,概率就越大
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
    if (d >= randomWeight) {
        serverIndex = n;
        break;
    } else {
        n++;
    }
}

4.7 DynamicServerListLoadBalancer

继承自 BaseLoadBalancerBaseLoadBalancer 不同的是它持有 ServerList 对象来进行动态的获取 Server 列表。

4.7.1 ServerList

ServerList
    <- DynamicServerList
    <- DiscoveryEnabledNIWSServerList
public interface ServerList<T extends Server> {
    public List<T> getInitialListOfServers();
    public List<T> getUpdatedListOfServers();   
}
  1. DynamicServerList: 定时地从一个 RouteStore 中获取
  2. DiscoveryEnabledNIWSServerList: 从 Eureka 中获取

4.8 Server 状态

怎么轮询怎么选择过滤都已经分析了,但是过滤和选择中使用到 Server Status 是怎么统计的,接下去看。

ServerStats 类记录了 server 的所有状态。

4.8.1 判断是否跳过

下面是判断是否跳过 server 上面已经分析,其中 stats.isCircuitBreakerTripped 是判断的关键

// AvailabilityPredicate
shouldSkipServer(ServerStats stats) {
    if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
        return true;
    }
    return false;
}
public boolean isCircuitBreakerTripped(long currentTime) {
    // 获取故障的到期时间点
    long circuitBreakerTimeout = getCircuitBreakerTimeout();
    if (circuitBreakerTimeout <= 0) {
        return false;
    }
    // 大于当前时间表示还在出于故障
    return circuitBreakerTimeout > currentTime;
}

private long getCircuitBreakerTimeout() {
    long blackOutPeriod = getCircuitBreakerBlackoutPeriod();
    if (blackOutPeriod <= 0) {
        return 0;
    }
    // 上次失败的时间点 + 需要断路的时间长度
    return lastConnectionFailedTimestamp + blackOutPeriod;
}

private long getCircuitBreakerBlackoutPeriod() {
    int failureCount = successiveConnectionFailureCount.get();
    int threshold = connectionFailureThreshold.get();
    // 小于阀值(默认3)即不断路
    if (failureCount < threshold) {
        return 0;
    }
    // diff 超过阀值的次数,最大16
    int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);
    // blackOutSeconds 最大 2^16 * 基数时间
    int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
    // 再次进行限制,断路总时间不超过 maxCircuitTrippedTimeout.get
    if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {
        blackOutSeconds = maxCircuitTrippedTimeout.get();
    }
    return blackOutSeconds * 1000L;
}

判断是否在断路不可用状态就这样,下面看一些状态是怎么进去的。

4.8.2 记录状态

LoadBalancerCommand 中有状态的记录

// 这里开始
loadBalancerContext.noteOpenConnection(stats);


@Override
public void onCompleted() {
     // 记录准确状态
     recordStats(tracer, stats, entity, null);
}

@Override
public void onError(Throwable e) {
     // 记录错误状态
    recordStats(tracer, stats, null, e);
    logger.debug("Got error {} when executed on server {}", e, server);
    if (listenerInvoker != null) {
        listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
    }
}

@Override
public void onNext(T entity) {
    this.entity = entity;
    if (listenerInvoker != null) {
        listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
    }
}      
                                            
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
    tracer.stop();
    // 这里介绍
    loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}

// loadBalancerContext
public void noteOpenConnection(ServerStats serverStats) {
    if (serverStats == null) {
        return;
    }
    try {
        serverStats.incrementActiveRequestsCount();
    } catch (Exception ex) {
        logger.error("Error noting stats for client {}", clientName, ex);
    }            
}

// serverStats
// 各种记录
public void incrementActiveRequestsCount() {        
    activeRequestsCount.incrementAndGet();
    requestCountInWindow.increment();
    long currentTime = System.currentTimeMillis();
    lastActiveRequestsCountChangeTimestamp = currentTime;
    lastAccessedTimestamp = currentTime;
    if (firstConnectionTimestamp == 0) {
        firstConnectionTimestamp = currentTime;
    }
}

// loadBalancerContext
public void noteRequestCompletion(ServerStats stats, Object response, Throwable e, long responseTime, RetryHandler errorHandler) {
    if (stats == null) {
        return;
    }
    try {
        recordStats(stats, responseTime);
        RetryHandler callErrorHandler = errorHandler == null ? getRetryHandler() : errorHandler;
        if (callErrorHandler != null && response != null) {
            // 没有错误时,清除连续错误标识
            stats.clearSuccessiveConnectionFailureCount();
        } else if (callErrorHandler != null && e != null) {
            // 判断是否需要断路的exception
            if (callErrorHandler.isCircuitTrippingException(e)) {
                // 有错误时开始连续错误计数
                stats.incrementSuccessiveConnectionFailureCount();  
                // 增加错误数                  
                stats.addToFailureCount();
            } else {
                // 非断路错误时清除连续标识
                stats.clearSuccessiveConnectionFailureCount();
            }
        }
    } catch (Exception ex) {
        logger.error("Error noting stats for client {}", clientName, ex);
    }            
}

// 退场专用
private void recordStats(ServerStats stats, long responseTime) {
    if (stats == null) {
        return;
    }
    // 活动请求数减一
    stats.decrementActiveRequestsCount();
    // 增加请求统计
    stats.incrementNumRequests();
    // 记录响应时间,有些负载策略需要响应时间
    stats.noteResponseTime(responseTime);
}

5. 回顾

5.1 调用路径

// 调用路径
1.HandleMapping -> 2.ZuulController -> 3.ZuulServlet.service() -> 4.RibbonRoutingFilter ->
5.HystrixCommand.execute() -> 6.AbstractRibbonCommand.run() ->
7.RibbonLoadBalancingHttpClient.executeWithLoadBalance() ->
8.LoadBalancerCommand.submit() -> 9.RibbonLoadBalancingHttpClient.execute() ->
10.HttpClient.execute()

1-4 都比较容易,5是为了有熔断效果所以用 Hystrix 进行包装,实际的逻辑都是对应的 Command 完成,7是不同的 Command 持有一个对应的 Client,执行 executeWithLoadBalance() 为了达到负载均衡和重试的效果,这个效果就交给 8.LoadBalancerCommand 完成,但是 LoadBalancerCommand 也只负责重试和负载均衡,具体执行的远程 http 请求还是由 9 来完成,而每个 BalancingClient 都是持有个真实的 client, 如: HttpClient, OKHttp,由这些 client 执行。

5.2 分支逻辑

5.2.1 负载均衡

分析了怎么进行 selectServer 的过程,以及常用的 ILoadBalancer 类型,对应的 IRule 即真实挑选和负载轮询逻辑实现。

5.2.2 状态记录

负载轮询的挑选逻辑中使用到 Server 的状态,所以分析了状态的记录以及怎么判断是否在断路状态的主要逻辑。

5.3 总结

Zuul 的主要代码并不是很大,即请求进来然后进行 Filter 处理,路由到上游服务器都是 Ribbon 的逻辑。

上一篇 下一篇

猜你喜欢

热点阅读