从头学习SpringCloud

从头学习SpringCloud(五)Ribbon负载均衡 Res

2021-01-10  本文已影响0人  Batistuta9

在之前搭建的项目里,我们在客户端的Application启动类里面使用了RestTemplate对象和@LoadBalanced注解实现Ribbon负载均衡。那么为什么要使用RestTemplate对象呢?@LoadBalanced注解的背后又有那些逻辑呢?
先看一下@LoadBalanced的源码。

package org.springframework.cloud.client.loadbalancer;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.beans.factory.annotation.Qualifier;

/**
 * Annotation to mark a RestTemplate or WebClient bean to be configured to use a
 * LoadBalancerClient.
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

注释里说的很明白,这个注解用来标注一个RestTemplate或者WebClient对象,使用负载均衡客户端(LoadBalancerClient)来配置。那么LoadBalancerClient又是个啥呢?一起来看看它的源码。


package org.springframework.cloud.client.loadbalancer;

import java.io.IOException;
import java.net.URI;

import org.springframework.cloud.client.ServiceInstance;

/**
 * Represents a client-side load balancer.
 *
 * @author Spencer Gibb
 */
public interface LoadBalancerClient extends ServiceInstanceChooser {

    /**
     * Executes request using a ServiceInstance from the LoadBalancer for the specified
     * service.
     * @param serviceId The service ID to look up the LoadBalancer.
     * @param request Allows implementations to execute pre and post actions, such as
     * incrementing metrics.
     * @param <T> type of the response
     * @throws IOException in case of IO issues.
     * @return The result of the LoadBalancerRequest callback on the selected
     * ServiceInstance.
     */
    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

    /**
     * Executes request using a ServiceInstance from the LoadBalancer for the specified
     * service.
     * @param serviceId The service ID to look up the LoadBalancer.
     * @param serviceInstance The service to execute the request to.
     * @param request Allows implementations to execute pre and post actions, such as
     * incrementing metrics.
     * @param <T> type of the response
     * @throws IOException in case of IO issues.
     * @return The result of the LoadBalancerRequest callback on the selected
     * ServiceInstance.
     */
    <T> T execute(String serviceId, ServiceInstance serviceInstance,
            LoadBalancerRequest<T> request) throws IOException;

    /**
     * Creates a proper URI with a real host and port for systems to utilize. Some systems
     * use a URI with the logical service name as the host, such as
     * http://myservice/path/to/service. This will replace the service name with the
     * host:port from the ServiceInstance.
     * @param instance service instance to reconstruct the URI
     * @param original A URI with the host as a logical service name.
     * @return A reconstructed URI.
     */
    URI reconstructURI(ServiceInstance instance, URI original);

}

LoadBalancerClient是一个接口,里面有3个方法。
1.<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
2.<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
3.URI reconstructURI(ServiceInstance instance, URI original);
方法1和方法2的作用是一样的,使用来自LoadBalancer的ServiceInstance为指定服务执行请求。方法3是创建一个具有真实主机和端口的适当URI,供系统使用,一些系统使用具有逻辑服务名称的URI作为主机,比如 http://myservice/path/to/service。返回的URI内容是通过ServiceInstance的服务实例详情拼接出具体的host:post形式的请求地址。
其实还有一个方法,ServiceInstance choose(String serviceId);

/**
     * Chooses a ServiceInstance from the LoadBalancer for the specified service.
     * @param serviceId The service ID to look up the LoadBalancer.
     * @return A ServiceInstance that matches the serviceId.
     */
    ServiceInstance choose(String serviceId);

这个方法在LoadBalancerClient的父类ServiceInstanceChooser里,用来从LoadBalancer中为指定服务选择ServiceInstance。
现在我们暂时离开LoadBalancerClient这个接口,来看一下LoadBalancerAutoConfiguration这个类。这个类是实现客户端负载均衡配置的自动化配置类。

package org.springframework.cloud.client.loadbalancer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.RestTemplate;

/**
 * Auto-configuration for Ribbon (client-side load balancing).
 *
 * @author Spencer Gibb
 * @author Dave Syer
 * @author Will Tran
 * @author Gang Li
 */
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    @Autowired(required = false)
    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
            final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
    }

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(
            LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {

        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }

    /**
     * Auto configuration for retry mechanism.
     */
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RetryTemplate.class)
    public static class RetryAutoConfiguration {

        @Bean
        @ConditionalOnMissingBean
        public LoadBalancedRetryFactory loadBalancedRetryFactory() {
            return new LoadBalancedRetryFactory() {
            };
        }

    }

    /**
     * Auto configuration for retry intercepting mechanism.
     */
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RetryTemplate.class)
    public static class RetryInterceptorAutoConfiguration {

        @Bean
        @ConditionalOnMissingBean
        public RetryLoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRetryProperties properties,
                LoadBalancerRequestFactory requestFactory,
                LoadBalancedRetryFactory loadBalancedRetryFactory) {
            return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
                    requestFactory, loadBalancedRetryFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }

}

有两个注解修饰了这个类。
@ConditionalOnClass(RestTemplate.class):RestTemplate类必须存在于当前工程的环境中。
@ConditionalOnBean(LoadBalancerClient.class):在Spring的Bean工程中必须有LoadBalancerClient实现的类。
(@EnableConfigurationProperties(LoadBalancerRetryProperties.class) 用来使重试的配置文件生效,这里暂不关注)。
LoadBalancerAutoConfiguration里有1个静态内部类,LoadBalancerInterceptorConfig。在这个静态内部类里面,进行了两个操作。一是创建了LoadBalancerInterceptor拦截器,而正是LoadBalancerInterceptor使一个普通的RestTemplate对象具备了负载均衡能力,这个后面再看。二是创建了一个定制器RestTemplateCustomizer,用来给所有的RestTemplate添加LoadBalancerInterceptor拦截器。
那么LoadBalancerInterceptor具体做了什么呢?先看一下源码。

package org.springframework.cloud.client.loadbalancer;

import java.io.IOException;
import java.net.URI;

import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.util.Assert;

/**
 * @author Spencer Gibb
 * @author Dave Syer
 * @author Ryan Baxter
 * @author William Tran
 */
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
            LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null,
                "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName,
                this.requestFactory.createRequest(request, body, execution));
    }

}

当一个被@LoadBalanced注解的RestTemplate对象发起http请求时,就会被intercept方法拦截,然后获取服务名,最后调用LoadBalancerClient的execute方法根据服务名来选择实例并发起实际的请求。但是LoadBalancerClient只是一个接口,最终是调用的它的实现类。这个实现类是RibbonLoadBalancerClient。看一下RibbonLoadBalancerClient的execute方法的源码。

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
        ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
        Server server = this.getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        } else {
            RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
            return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
        }
    }

    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if (serviceInstance instanceof RibbonLoadBalancerClient.RibbonServer) {
            server = ((RibbonLoadBalancerClient.RibbonServer)serviceInstance).getServer();
        }

        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        } else {
            RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
            RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

            try {
                T returnVal = request.apply(serviceInstance);
                statsRecorder.recordStats(returnVal);
                return returnVal;
            } catch (IOException var8) {
                statsRecorder.recordStats(var8);
                throw var8;
            } catch (Exception var9) {
                statsRecorder.recordStats(var9);
                ReflectionUtils.rethrowRuntimeException(var9);
                return null;
            }
        }
    }
    protected Server getServer(ILoadBalancer loadBalancer) {
        return this.getServer(loadBalancer, (Object)null);
    }

    protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        return loadBalancer == null ? null : loadBalancer.chooseServer(hint != null ? hint : "default");
    }

在execute方法中,首先根据serviceId获取了一个ILoadBalancer接口,然后ILoadBalancer的chooseServer方法获取server对象。

package com.netflix.loadbalancer;

import java.util.List;

public interface ILoadBalancer {
    void addServers(List<Server> var1);

    Server chooseServer(Object var1);

    void markServerDown(Server var1);

    /** @deprecated */
    @Deprecated
    List<Server> getServerList(boolean var1);

    List<Server> getReachableServers();

    List<Server> getAllServers();
}

ILoadBalancer中有以下方法。
addServers:向负载均衡器中维护的实例列表增加服务实例;
chooseServer:通过某种策略,从负载均衡器中挑选出一个具体的服务实例。
markServerDown:用来通知和表示负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务。
getReachableServers:获取当前正常服务的实例列表。
getAllServers:获取所有已知的服务实例列表,包括正常服务和停止服务的实例。
ILoadBalancer是一个接口,那么它的实现类是谁呢?
在RibbonClientConfiguration配置类中我们可以看到,默认的实现类是ZoneAwareLoadBalancer。

@Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
    }

现在我们通过ILoadBalancer的实现类ZoneAwareLoadBalancer,获取到了Server对象。Server对象里是没有serviceId的,把Server对象转换成RibbonServer对象,RibbonServer对象实现了ServiceInstance接口,其中包含serviceId、host、port等数据。然后会回调LoadBalancerRequest的apply方法,发起真正的请求,得到一个ClientHttpResponse对象。
在apply的方法实现中,传入了一个ServiceRequestWrapper对象,该对象为HttpRequestWrapper的子类,重写了父类的getURI方法。在getURI方法中,调用了LoadBalancerClient的reconstructURI方法。

public class AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        this.loadBalancer = loadBalancer;
    }

    @Override
    public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
            final byte[] body, final AsyncClientHttpRequestExecution execution)
            throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        return this.loadBalancer.execute(serviceName,
                new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
                    @Override
                    public ListenableFuture<ClientHttpResponse> apply(
                            final ServiceInstance instance) throws Exception {
                        HttpRequest serviceRequest = new ServiceRequestWrapper(request,
                                instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
                        return execution.executeAsync(serviceRequest, body);
                    }

                });
    }

}
@Override
    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
        return uri;
    }

得到ServiceRequestWrapper对象后,调用了AsyncClientHttpRequestExecution的executeAsync方法。

public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                AsyncClientHttpRequestInterceptor interceptor = (AsyncClientHttpRequestInterceptor)this.iterator.next();
                return interceptor.intercept(request, body, this);
            } else {
                URI uri = request.getURI();
                HttpMethod method = request.getMethod();
                HttpHeaders headers = request.getHeaders();
                Assert.state(method != null, "No standard HTTP method");
                AsyncClientHttpRequest delegate = InterceptingAsyncClientHttpRequest.this.requestFactory.createAsyncRequest(uri, method);
                delegate.getHeaders().putAll(headers);
                if (body.length > 0) {
                    StreamUtils.copy(body, delegate.getBody());
                }

                return delegate.executeAsync();
            }
        }

我们知道这里的HttpRequest对象,其实是一个ServiceRequestWrapper的实例。所以在获取URI对象的时候,调用的是LoadBalancerClient的reconstructURI方法,也就是LoadBalancerClient的实现类RibbonLoadBalancerClient的reconstructURI方法。(绕了一大圈又绕回来了)
再单独贴一下reconstructURI方法吧。

public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) {
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
        }
        else {
            server = new Server(instance.getScheme(), instance.getHost(),
                    instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);
    }

最终是执行的RibbonLoadBalancerContext的reconstructURIWithServer方法。

public URI reconstructURIWithServer(Server server, URI original) {
        String host = server.getHost();
        int port = server.getPort();
        String scheme = server.getScheme();
        if (host.equals(original.getHost()) && port == original.getPort() && scheme == original.getScheme()) {
            return original;
        } else {
            if (scheme == null) {
                scheme = original.getScheme();
            }

            if (scheme == null) {
                scheme = (String)this.deriveSchemeAndPortFromPartialUri(original).first();
            }

            try {
                StringBuilder sb = new StringBuilder();
                sb.append(scheme).append("://");
                if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                    sb.append(original.getRawUserInfo()).append("@");
                }

                sb.append(host);
                if (port >= 0) {
                    sb.append(":").append(port);
                }

                sb.append(original.getRawPath());
                if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                    sb.append("?").append(original.getRawQuery());
                }

                if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                    sb.append("#").append(original.getRawFragment());
                }

                URI newURI = new URI(sb.toString());
                return newURI;
            } catch (URISyntaxException var8) {
                throw new RuntimeException(var8);
            }
        }
    }

在这个方法中,可以很清楚的看到,从Sever对象中获取了host和port,然后根据以服务名称为host的URI对象original中获取其他请求信息。将二者内容拼接整合,形成最终要访问的服务实例的具体地址。

最后来总结一下吧。被@LoadBalanced注解修饰的RestTemplate对象实现了Ribbon负载均衡。LoadBalancerAutoConfiguration配置类可以给所有@LoadBalanced注解修饰的RestTemplate对象添加一个LoadBalancerInterceptor拦截器。在这个拦截器中,调用了LoadBalancerClient接口的实现类RibbonLoadBalancerClient的execute方法。在execute方法中,通过ILoadBalancer接口的ZoneAwareLoadBalancer实现类获取Sever对象,然后使用ServiceRequestWrapper类实现了HttpRequest接口,而ServiceRequestWrapper中重写了gerURI方法,调用了LoadBalancerClient的reconstructURI方法。最后在reconstructURI方法中完成了URI对象的组装。

上一篇下一篇

猜你喜欢

热点阅读