Ribbon原理解析

2019-04-22  本文已影响0人  aiwen2017

一. 核心接口

  1. ILoadBalancer
    Ribbon通过ILoadBalancer接口对外提供统一的选择服务器(Server)的功能,此接口会根据不同的负载均衡策略(IRule)选择合适的Server返回给使用者。其核心方法如下:

    public interface ILoadBalancer {
    
        public void addServers(List<Server> newServers);
    
        public Server chooseServer(Object key);
        
        public void markServerDown(Server server);
        
        public List<Server> getReachableServers();
    
        public List<Server> getAllServers();
    }
    

    此接口默认实现类为ZoneAwareLoadBalancer,相关类关系图如下:


    image
  2. IRule
    IRule是负载均衡策略的抽象,ILoadBalancer通过调用IRule的choose()方法返回Server,其核心方法如下:

    public interface IRule{
        
        public Server choose(Object key);
        
        public void setLoadBalancer(ILoadBalancer lb);
        
        public ILoadBalancer getLoadBalancer();    
    }
    

    实现类有:

    • BestAviableRule
      跳过熔断的Server,在剩下的Server中选择并发请求最低的Server
    • ClientConfigEnabledRoundRobinRule、RoundRobinRule
      轮询
    • RandomRule
      随机选择
    • RetryRule
      可重试的策略,可以对其他策略进行重试,默认轮询重试
    • WeightedResponseTimeRule
      根据响应时间加权,响应时间越短权重越大
    • AvailabilityFilteringRule
      剔除因为连续链接、读失败或链接超过最大限制导致熔断的Server,在剩下读Server中进行轮询。

    相关类图如下:


    image
  3. IPing
    IPing用来检测Server是否可用,ILoadBalancer的实现类维护一个Timer每隔10s检测一次Server的可用状态,其核心方法有:

    public interface IPing {
    
        public boolean isAlive(Server server);
    }
    

    其实现类有:


    image
  4. IClientConfig
    IClientConfig主要定义了用于初始化各种客户端和负载均衡器的配置信息,器实现类为DefaultClientConfigImpl。

二. 负载均衡的逻辑实现

1. Server的选择

ILoadBalancer接口的主要实现类为BaseLoadBalancer和ZoneAwareLoadBalancer,ZoneAwareLoadBalancer为BaseLoadBalancer的子类并且其也重写了chooseServer方法,ZoneAwareLoadBalancer从其名称可以看出这个实现类是和Spring Cloud的分区有关的,当分区的数量为1(默认配置)时它直接调用父类BaseLoadBalancer的chooseServer()方法,源码如下:

@Override
public Server chooseServer(Object key) {
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        // 调用父类BaseLoadBalancer的chooseServer()方法
        return super.chooseServer(key);
    }
    
    // 略
}

类BaseLoadBalancer的chooseServer()方法直接调用IRule接口的choose()方法,源码如下:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

这里IRule的实现类为ZoneAvoidanceRule,choose()方法的实现在其父类PredicateBasedRule中,如下:

@Override
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

从上面源码可以看出,其先调用ILoadBalancer的getAllServers()方法获取所有Server列表,getAllServers()方法的实现在BaseLoadBalancer类中,此类维护了一个List<Server>类型的属性allServerList,所有Server都缓存至此集合中。获取Server列表后调用chooseRoundRobinAfterFiltering()方法返回Server对象。chooseRoundRobinAfterFiltering()方法会根据loadBalancerKey筛选出候选的Server,然后通过轮询的负载均衡策略选出Server,相关源码如下:

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

可以看到其轮询选择Server的策略为获取次数加1然后对Server数量取余得到。

2. Server的状态检测

BaseLoadBalancer类的集合allServerList缓存了所有Server信息,但是这些Server的状态有可能发生变化,比如Server不可用了,Ribbon就需要及时感知到,那么Ribbon是如何感知Server可用不可用的呢?
BaseLoadBalancer的构造函数中初始化了一个任务调度器Timer,这个调度器每隔10s执行一次PingTask任务,相关源码如下:

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
    
    this.name = name;
    this.ping = ping;
    this.pingStrategy = pingStrategy;
    setRule(rule);
    setupPingTask();
    lbStats = stats;
    init();
}
    
void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}

class PingTask extends TimerTask {
    public void run() {
        try {
            new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error pinging", name, e);
        }
    }
}

深入Pinger和SerialPingStrategy的源码可知,最终通过NIWSDiscoveryPing这一IPing实现类判断Server是否可用,NIWSDiscoveryPing的isAlive()方法通过判断与Server关联的InstanceInfo的status是否为UP来判断Server是否可用,其isAlive()方法源码如下:

public boolean isAlive(Server server) {
    boolean isAlive = true;
    if (server!=null && server instanceof DiscoveryEnabledServer){
        DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
        InstanceInfo instanceInfo = dServer.getInstanceInfo();
        if (instanceInfo!=null){                    
            InstanceStatus status = instanceInfo.getStatus();
            if (status!=null){
                // 其状态是否为UP
                isAlive = status.equals(InstanceStatus.UP);
            }
        }
    }
    return isAlive;
}

三、Ribbon的使用姿势

1. RestTemplate + @LoadBalanced

2. Feign接口

相对于RestTemplate+@Loadbalance的方式,我们在使用Spring Cloud的时候使用更多的是Feign接口,因为Feign接口使用起来会更加简单,下面就是一个使用Feign接口调用服务的例子:

// 定义Feign接口
@FeignClient(value = "Eureka-Producer", fallbackFactory = HelloClientFallbackFactory.class)
public interface HelloClient {

    @GetMapping("/hello")
    String hello();
}

// 订单熔断快速失败回调
@Component
public class HelloClientFallbackFactory implements FallbackFactory<HelloClient>, HelloClient {

    @Override
    public HelloClient create(Throwable throwable) {
        return this;
    }

    @Override
    public String hello() {
        return "熔断";
    }
}

// 使用
@RestController
public class HelloController {

    @Resource
    private HelloClient helloClient;

    @GetMapping("/hello")
    public String hello() {
        return helloClient.hello();
    }
}

与RestTemplate的通过RibbonLoadBalancerClient获取Server并执行请求类似,Feign接口通过LoadBalancerFeignClient获取Server并执行请求。DefaultFeignLoadBalancedConfiguration会注入LoadBalancerFeignClient Bean,源码如下:

@Configuration
class DefaultFeignLoadBalancedConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
                              SpringClientFactory clientFactory) {
        return new LoadBalancerFeignClient(new Client.Default(null, null),
                cachingFactory, clientFactory);
    }
}

那么Feign接口是如何通过LoadBalancerFeignClient实现负载均衡调用的呢?在《Feign源码解析》一文中介绍到Feign接口的代理实现类由FeignClientFactoryBean负责生成,FeignClientFactoryBean实现了FactoryBean,所以其getObject()方法会返回Feign接口的代理实现,getObject()方法会从Spring上下文中获取到LoadBalancerFeignClient,源码如下:

@Override
public Object getObject() throws Exception {
    return getTarget();
}

<T> T getTarget() {
    FeignContext context = applicationContext.getBean(FeignContext.class);
    Feign.Builder builder = feign(context);

    if (!StringUtils.hasText(this.url)) {
        if (!this.name.startsWith("http")) {
            url = "http://" + this.name;
        }
        else {
            url = this.name;
        }
        url += cleanPath();
        return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
                this.name, url));
    }
    if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
        this.url = "http://" + this.url;
    }
    String url = this.url + cleanPath();
    // 从Spring上下文中获取LoadBalancerFeignClient
    Client client = getOptional(context, Client.class);
    if (client != null) {
        if (client instanceof LoadBalancerFeignClient) {
            // not load balancing because we have a url,
            // but ribbon is on the classpath, so unwrap
            client = ((LoadBalancerFeignClient)client).getDelegate();
        }
        builder.client(client);
    }
    Targeter targeter = get(context, Targeter.class);
    return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
            this.type, this.name, url));
}

LoadBalancerFeignClient对外提供服务的接口是execute()方法,那么此方法是何时被Feign接口调用的呢?从《Feign源码解析》一文中可知SynchronousMethodHandler作为MethodHandler的实现在调用Feign接口时进行拦截并执行其invoke()方法,invoke()方法则调用了LoadBalancerFeignClient的execute()方法发起网络请求,相关源码如下:

@Override
public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template);
      } catch (RetryableException e) {
        // 略
        continue;
      }
    }
}

Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);
    
    if (logLevel != Logger.Level.NONE) {
      logger.logRequest(metadata.configKey(), logLevel, request);
    }
    
    Response response;
    long start = System.nanoTime();
    try {
      // 调用LoadBalancerFeignClient的execute()方法获取响应。
      response = client.execute(request, options);
    } catch (IOException e) {
      if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
      }
      throw errorExecuting(request, e);
    }
    
    // 略
}

那么LoadBalancerFeignClient的execute()方法又是如何利用Ribbon做负载均衡的呢?其通过调用CachingSpringLoadBalancerFactory的create()方法获取FeignLoadBalancer对象,FeignLoadBalancer对象持有一个ILoadBalancer的对象实例,此ILoadBalancer对象实例是CachingSpringLoadBalancerFactory通过调用SpringClientFactory的getLoadBalancer()方法从Spring上下文中获取的,源码如下:

public FeignLoadBalancer create(String clientName) {
    FeignLoadBalancer client = this.cache.get(clientName);
    if(client != null) {
        return client;
    }
    IClientConfig config = this.factory.getClientConfig(clientName);
    ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
    ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
    client = loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
        loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
    this.cache.put(clientName, client);
    return client;
}

创建完FeignLoadBalancer后紧接着接着调用了FeignLoadBalancer的executeWithLoadBalancer()方法,如下:

@Override
public Response execute(Request request, Request.Options options) throws IOException {
    URI asUri = URI.create(request.url());
    String clientName = asUri.getHost();
    URI uriWithoutHost = cleanUrl(request.url(), clientName);
    FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
            this.delegate, request, uriWithoutHost);

    IClientConfig requestConfig = getClientConfig(options, clientName);
    // 执行FeignLoadBalancer的executeWithLoadBalancer()方法。
    return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
            requestConfig).toResponse();
    // 略
}
// 创建FeignLoadBalancer对象并返回
private FeignLoadBalancer lbClient(String clientName) {
    return this.lbClientFactory.create(clientName);
}

executeWithLoadBalancer()方法的具体实现在类FeignLoadBalancer的父类AbstractLoadBalancerAwareClient中,如下:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        // 略
    }
}

executeWithLoadBalancer()方法创建了LoadBalancerCommand对象并且向提交(submit()方法)了一个ServerOperation对象,跟踪LoadBalancerCommand的submit()方法发现其调用了selectServer()方法获取Server,而selectServer()方法则委托给了FeignLoadBalancer的父类LoadBalancerContext的getServerFromLoadBalancer()方法获取Server,如下:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }
    
}

public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
    // 略
    
    // 这里当server为null时调用selectServer()获取Server。
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) {
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);
                    
                    // Called for each attempt and retry
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    // 略
                }
            });
        // 略
}

private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                // 调用LoadBalancerContext的getServerFromLoadBalancer()获取Server
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

FeignLoadBalancer和LoadBalancerCommand互相依赖、彼此调用,最终FeignLoadBalancer的父类LoadBalancerContext的getServerFromLoadBalancer()方法返回了Server,此方法通过调用其持有的ILoadBalancer对象的chooseServer()方法获取Server,源码如下:

public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
    int port = -1;
    if (original != null) {
        host = original.getHost();
    }
    if (original != null) {
        Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
        port = schemeAndPort.second();
    }
    // 获取ILoadBalancer
    ILoadBalancer lb = getLoadBalancer();
    // 调用ILoadBalancer的chooseServer()方法获取Server。
    Server svc = lb.chooseServer(loadBalancerKey);
    if (svc == null){
        throw new ClientException(ClientException.ErrorType.GENERAL,
                "Load balancer does not have available server for client: "
                        + clientName);
    }
    host = svc.getHost();
    if (host == null){
        throw new ClientException(ClientException.ErrorType.GENERAL,
                "Invalid Server for :" + svc);
    }
    logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
    return svc;
}

至此终于看到了通过ILoadBalancer获取Server的代码了,相关类图如下:

image

四、总结

Ribbon通过ILoadBalancer接口提供负载均衡服务,其实现原理为:

当使用RestTemplate+@LoadBalanced的方式进行服务调用时,LoadBalancerInterceptor和RibbonLoadBalancerClient作为桥梁结合Ribbon提供负载均衡服务。

当使用Feign接口调用服务时,LoadBalancerFeignClient和FeignLoadBalancer作为调用Ribbon的入口为Feign接口提供负载均衡服务。

不管使用那种姿势,最终都会通过Ribbon的ILoadBalancer接口实现负载均衡。

最后放两个彩蛋

  1. Ribbon相关Configuration以及注入的Bean:

    • RibbonAutoConfiguration

      • 注入了 LoadBalancerClient的实现类RibbonLoadBalancerClient。
      • 注入了SpringClientFactory。
    • LoadBalancerAutoConfiguration

      • 注入了LoadBalancerInterceptor。
      • 给RestTemplate设置LoadBalancerInterceptor。
    • RibbonClientConfiguration

      • 注入了ILoadBalancer的实现类ZoneAwareLoadBalancer。
      • 注入了IRule的实现类ZoneAvoidanceRule。
      • 注入了IClientConfig的实现类DefaultClientConfigImpl。
    • EurekaRibbonClientConfiguration

      • 注入了IPing的实现类NIWSDiscoveryPing。
      • 注入了ServerList的实现类DiscoveryEnabledNIWSServerList。
  2. Ribbon相关类关系图:

image
上一篇 下一篇

猜你喜欢

热点阅读