Ribbon源码分析

2018-09-05  本文已影响23人  沉沦2014

先看看@LoadBalanced注解:

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

这就是一个普通的标记注解,作用就是修饰RestTemplate让其拥有负载均衡的能力,查看org.springframework.cloud.client.loadbalancer包下面的class,我们很容易发现

LoadBalancerAutoConfiguration这个类;

@Configuration//这是一个配置类
@ConditionalOnClass(RestTemplate.class)//这个配置文件加载必要条件是存在RestTemplate类和LoadBalancerClient
//Bean
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

   //这个很重要,这里的restTemplates是所有的被@LoadBalanced注解的集合,这就是标记注解的作用(Autowired是可以集合注入的)
   @LoadBalanced
   @Autowired(required = false)
   private List<RestTemplate> restTemplates = Collections.emptyList();

   @Bean
   public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
         final List<RestTemplateCustomizer> customizers) {
      return new SmartInitializingSingleton() {
         @Override
         public void afterSingletonsInstantiated() {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
               for (RestTemplateCustomizer customizer : customizers) {
                  customizer.customize(restTemplate);
               }
            }
         }
      };
   }

   @Autowired(required = false)
   private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

   @Bean
   @ConditionalOnMissingBean
   public LoadBalancerRequestFactory loadBalancerRequestFactory(
         LoadBalancerClient loadBalancerClient) {
      return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
   }

   //生成一个LoadBalancerInterceptor的Bean
   @Configuration
   @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
   static class LoadBalancerInterceptorConfig {
      @Bean
      public LoadBalancerInterceptor ribbonInterceptor(
            LoadBalancerClient loadBalancerClient,
            LoadBalancerRequestFactory requestFactory) {
         return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
      }

     //给注解了@LoadBalanced的RestTemplate加上拦截器
      @Bean
      @ConditionalOnMissingBean
      public RestTemplateCustomizer restTemplateCustomizer(
            final LoadBalancerInterceptor loadBalancerInterceptor) {
         return new RestTemplateCustomizer() {
            @Override
            public void customize(RestTemplate restTemplate) {
               List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                     restTemplate.getInterceptors());
               list.add(loadBalancerInterceptor);
               restTemplate.setInterceptors(list);
            }
         };
      }
   }
}

总结:现在我们应该大致知道@loadBalanced的作用了,就是起到一个标记RestTemplate的作用,当服务启动时,标记了的RestTemplate对象里面就会被自动加入LoadBalancerInterceptor拦截器,这样当RestTemplate像外面发起http请求时,会被LoadBalancerInterceptor的intercept函数拦截,而intercept里面又调用了LoadBalancerClient接口实现类execute方法,我们接着往下看;

LoadBalancerInterceptor的intercept方法:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
 //这是以服务名为地址的原始请求:例:http://HELLO-SERVICE/hello
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
   return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}

这里的LoadBalancerClient的实现是RibbonLoadBalancerClient:

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }
    @Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }
       //ZoneAwareLoadBalancer的Rule规则去选择Server(这里的规则是统一个zone的优先选择,这里返回的Server
       //就是通过serviceId和Rule解析返回的带有IP:port形式的信息的Server)
       protected Server getServer(ILoadBalancer loadBalancer) {
           return loadBalancer == null ? null : loadBalancer.chooseServer("default");
       }

可以看到,在execute函数的实现中,第一步做的就是通过getServer根据传入的服务名serviceId去获得具体的服务实例:

protected Server getServer(ILoadBalancer loadBalancer) {
    if (loadBalancer == null) {
        return null;
    }
    return loadBalancer.chooseServer("default");
}

通过getServer函数的实现源码,我们可以看到这里获取具体服务实例的时候并没有使用LoadBalancerClient接口中的choose函数,而是使用了ribbon自身的ILoadBalancer接口中定义的chooseServer函数。

我们先来认识一下ILoadBalancer接口:

public interface ILoadBalancer {

    public void addServers(List<Server> newServers);

    public Server chooseServer(Object key);

    public void markServerDown(Server server);

    public List<Server> getReachableServers();

    public List<Server> getAllServers();
}

可以看到,在该接口中定义了一个软负载均衡器需要的一系列抽象操作(未例举过期函数):

在该接口定义中涉及到的Server对象定义的是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括:host、port以及一些部署信息等。

image.png

而对于该接口的实现,我们可以整理出如上图所示的结构。我们可以看到BaseLoadBalancer类实现了基础的负载均衡,而DynamicServerListLoadBalancerZoneAwareLoadBalancer在负载均衡的策略上做了一些功能的扩展。

那么在整合Ribbon的时候Spring Cloud默认采用了哪个具体实现呢?我们通过RibbonClientConfiguration配置类,可以知道在整合时默认采用了ZoneAwareLoadBalancer来实现负载均衡器。

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
        IRule rule, IPing ping) {
    ZoneAwareLoadBalancer<Server> balancer = LoadBalancerBuilder.newBuilder()
            .withClientConfig(config).withRule(rule).withPing(ping)
            .withServerListFilter(serverListFilter).withDynamicServerList(serverList)
            .buildDynamicServerListLoadBalancer();
    return balancer;
}

下面,我们再回到RibbonLoadBalancerClientexecute函数逻辑,在通过ZoneAwareLoadBalancerchooseServer函数获取了负载均衡策略分配到的服务实例对象Server之后,将其内容包装成RibbonServer对象(该对象除了存储了服务实例的信息之外,还增加了服务名serviceId、是否需要使用HTTPS等其他信息),然后使用该对象再回调LoadBalancerInterceptor请求拦截器中LoadBalancerRequestapply(final ServiceInstance instance)函数,向一个实际的具体服务实例发起请求,从而实现一开始以服务名为host的URI请求,到实际访问host:post形式的具体地址的转换。

apply(final ServiceInstance instance)函数中传入的ServiceInstance接口是对服务实例的抽象定义。在该接口中暴露了服务治理系统中每个服务实例需要提供的一些基本信息,比如:serviceId、host、port等,具体定义如下:

public interface ServiceInstance {

    String getServiceId();

    String getHost();

    int getPort();

    boolean isSecure();

    URI getUri();

    Map<String, String> getMetadata();
}

而上面提到的具体包装Server服务实例的RibbonServer对象就是ServiceInstance接口的实现,可以看到它除了包含了Server对象之外,还存储了服务名、是否使用https标识以及一个Map类型的元数据集合。

protected static class RibbonServer implements ServiceInstance {

    private final String serviceId;
    private final Server server;
    private final boolean secure;
    private Map<String, String> metadata;

    protected RibbonServer(String serviceId, Server server) {
        this(serviceId, server, false, Collections.<String, String> emptyMap());
    }

    protected RibbonServer(String serviceId, Server server, boolean secure,
            Map<String, String> metadata) {
        this.serviceId = serviceId;
        this.server = server;
        this.secure = secure;
        this.metadata = metadata;
    }

    // 省略实现ServiceInstance的一些获取Server信息的get函数
    ...
}

那么apply(final ServiceInstance instance)函数,在接收到了具体ServiceInstance实例后,是如何通过LoadBalancerClient接口中的reconstructURI操作来组织具体请求地址的呢?

@Override
public ClientHttpResponse apply(final ServiceInstance instance)
            throws Exception {
    HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance);
    return execution.execute(serviceRequest, body);
}

apply的实现中,我们可以看到它具体执行的时候,还传入了ServiceRequestWrapper对象,该对象继承了HttpRequestWrapper并重写了getURI函数,重写后的getURI会通过调用LoadBalancerClient接口的reconstructURI函数来重新构建一个URI来进行访问。

private class ServiceRequestWrapper extends HttpRequestWrapper {

    private final ServiceInstance instance;

    ...

    @Override
    public URI getURI() {
        URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI(
                this.instance, getRequest().getURI());
        return uri;
    }
}

LoadBalancerInterceptor拦截器中,ClientHttpRequestExecution的实例具体执行execution.execute(serviceRequest, body)时,会调用InterceptingClientHttpRequestInterceptingRequestExecution类的execute函数,具体实现如下:

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    if (this.iterator.hasNext()) {
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
        delegate.getHeaders().putAll(request.getHeaders());
        if (body.length > 0) {
            StreamUtils.copy(body, delegate.getBody());
        }
        return delegate.execute();
    }
}

可以看到在创建请求的时候requestFactory.createRequest(request.getURI(), request.getMethod());,这里request.getURI()会调用之前介绍的ServiceRequestWrapper对象中重写的getURI函数。此时,它就会使用RibbonLoadBalancerClient中实现的reconstructURI来组织具体请求的服务实例地址。

public URI reconstructURI(ServiceInstance instance, URI original) {
    Assert.notNull(instance, "instance can not be null");
    String serviceId = instance.getServiceId();
    RibbonLoadBalancerContext context = this.clientFactory
            .getLoadBalancerContext(serviceId);
    Server server = new Server(instance.getHost(), instance.getPort());
    boolean secure = isSecure(server, serviceId);
    URI uri = original;
    if (secure) {
        uri = UriComponentsBuilder.fromUri(uri).scheme("https").build().toUri();
    }
    return context.reconstructURIWithServer(server, uri);
}

reconstructURI函数中,我们可以看到,它通过ServiceInstance实例对象的serviceId,从SpringClientFactory类的clientFactory对象中获取对应serviceId的负载均衡器的上下文RibbonLoadBalancerContext对象。然后根据ServiceInstance中的信息来构建具体服务实例信息的Server对象,并使用RibbonLoadBalancerContext对象的reconstructURIWithServer函数来构建服务实例的URI。

为了帮助理解,简单介绍一下上面提到的SpringClientFactoryRibbonLoadBalancerContext

reconstructURIWithServer的实现中我们可以看到,它同reconstructURI的定义类似。只是reconstructURI的第一个保存具体服务实例的参数使用了Spring Cloud定义的ServiceInstance,而reconstructURIWithServer中使用了Netflix中定义的Server,所以在RibbonLoadBalancerClient实现reconstructURI时候,做了一次转换,使用ServiceInstance的host和port信息来构建了一个Server对象来给reconstructURIWithServer使用。从reconstructURIWithServer的实现逻辑中,我们可以看到,它从Server对象中获取host和port信息,然后根据以服务名为host的URI对象original中获取其他请求信息,将两者内容进行拼接整合,形成最终要访问的服务实例的具体地址。

public class LoadBalancerContext implements IClientConfigAware {

    ...

    public URI reconstructURIWithServer(Server server, URI original) {
        String host = server.getHost();
        int port = server .getPort();
        if (host.equals(original.getHost())
                && port == original.getPort()) {
            return original;
        }
        String scheme = original.getScheme();
        if (scheme == null) {
            scheme = deriveSchemeAndPortFromPartialUri(original).first();
        }

        try {
            StringBuilder sb = new StringBuilder();
            sb.append(scheme).append("://");
            if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                sb.append(original.getRawUserInfo()).append("@");
            }
            sb.append(host);
            if (port >= 0) {
                sb.append(":").append(port);
            }
            sb.append(original.getRawPath());
            if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                sb.append("?").append(original.getRawQuery());
            }
            if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                sb.append("#").append(original.getRawFragment());
            }
            URI newURI = new URI(sb.toString());
            return newURI;
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

    ...
}

另外,从RibbonLoadBalancerClientexecute的函数逻辑中,我们还能看到在回调拦截器中,执行具体的请求之后,ribbon还通过RibbonStatsRecorder对象对服务的请求还进行了跟踪记录,这里不再展开说明,有兴趣的读者可以继续研究。

分析到这里,我们已经可以大致理清Spring Cloud中使用Ribbon实现客户端负载均衡的基本脉络。了解了它是如何通过LoadBalancerInterceptor拦截器对RestTemplate的请求进行拦截,并利用Spring Cloud的负载均衡器LoadBalancerClient将以逻辑服务名为host的URI转换成具体的服务实例的过程。同时通过分析LoadBalancerClient的Ribbon实现RibbonLoadBalancerClient,可以知道在使用Ribbon实现负载均衡器的时候,实际使用的还是Ribbon中定义的ILoadBalancer接口的实现,自动化配置会采用ZoneAwareLoadBalancer的实例来进行客户端负载均衡实现。

上一篇下一篇

猜你喜欢

热点阅读