ribbon源码解析

2020-08-03  本文已影响0人  拥抱孤独_to

ribbon的使用非常简单,只需要在spring容器中注入一个带有@LoadBalanced注解的RestTmplate就能直接使用


    @LoadBalanced
    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

这里先编写一个服务,供client端调用

@RestController
public class RibbonService {

    @Value("${server.port}")
    private String port;

    @GetMapping("/info")
    public String nameAndPort() {
        return "ribbon-server : "+port;
    }
}

server端application.properties配置

spring.application.name=ribbon-server
server.port=8081

client端直接调用

@RestController
public class ClientController {
    @Autowired
    RestTemplate restTemplate;

    @GetMapping("/test")
    public String test() {
        return restTemplate.getForObject("http://ribbon-server/info",String.class);
    }
}

client端application.properties配置,这里我们配置了服务端的地址

server.port=8080
spring.application.name=ribbon-client
ribbon-server.ribbon.listOfServers=localhost:8081

之后访问本地/test接口,调用成功。


image.png

上面简单的配置就能使用RestTmplate来进行服务间的调用,这里配上ribbon的其他配置

# 禁用 Eureka
ribbon.eureka.enabled=false
#这个配置是针对具体服务的,前缀就是服务名称
<server-application-name>.ribbon.listOfServers=localhost:8081,localhost:8083
# 请求连接的超时时间
ribbon.ConnectTimeout=2000
# 请求处理的超时时间
ribbon.ReadTimeout=5000
也可以为每个Ribbon客户端设置不同的超时时间, 通过服务名称进行指定:
<server-application-name>.ribbon.ConnectTimeout=2000
<server-application-name>.ribbon.ReadTimeout=5000
# 最大连接数
ribbon.MaxTotalConnections=500
# 每个host最大连接数
ribbon.MaxConnectionsPerHost=500
# ribbon服务列表刷新时间,默认刷新间隔30s
ribbon.ServerListRefreshInterval=30000
# 服务负载均衡器操作接口 需要实现 ILoadBalancer
<server-application-name>.ribbon.NFLoadBalancerClassName=
# 服务负载均衡算法 需要实现 IRule
<server-application-name>.ribbon.NFLoadBalancerRuleClassName=
# 服务可用性检查 需要实现 IPing
<server-application-name>.ribbon.NFLoadBalancerPingClassName=
# 服务列表获取 需要实现 ServerList
<server-application-name>.ribbon.NIWSServerListClassName=
# 服务列表的过滤 需要实现 ServerList­Filter
<server-application-name>.ribbon.NIWSServerListFilterClassName=

其中ribbon自带的负载均衡策略有如下

  1. BestAvailabl
    选择一个最小的并发请求的 Server,逐个考察 Server,如果 Server 被标记为错误,则跳过,然后再选择 ActiveRequestCount 中最小的 Server。
  2. AvailabilityFilteringRule
    过滤掉那些一直连接失败的且被标记为 circuit tripped 的后端 Server,并过滤掉那些高并发的后端 Server 或者使用一个 AvailabilityPredicate 来包含过滤 Server 的逻辑。其实就是检查 Status 里记录的各个 Server 的运行状态。
  3. ZoneAvoidanceRule
    使用 ZoneAvoidancePredicate 和 AvailabilityPredicate 来判断是否选择某个 Server,前一个判断判定一个 Zone 的运行性能是否可用,剔除不可用的 Zone(的所有 Server),AvailabilityPredicate 用于过滤掉连接数过多的 Server。
  4. RandomRule
    随机选择一个 Server。
  5. RoundRobinRule
    轮询选择,轮询 index,选择 index 对应位置的 Server。
  6. RetryRule
    对选定的负载均衡策略机上重试机制,也就是说当选定了某个策略进行请求负载时在一个配置时间段内若选择 Server 不成功,则一直尝试使用 subRule 的方式选择一个可用的 Server。
  7. WeightedResponseTimeRule
    根据响应时间分配一个 Weight(权重),响应时间越长,Weight 越小,被选中的可能性越低。

接下来,我们看下为什么通过服务名并且加上@LoadBalanced注解就能实现对服务的调用并且还能实现负载均衡的功能

image.png
springboot启动会加载META-INF/spring.factories文件,将key为EnableAutoConfiguration的类都装配到spring容器中,所以ribbon的入口就是RibbonAutoConfiguration这个类
在这个类中,就是装载了一些bean,其中我们只需要看LoadBalancerAutoConfiguration这个类
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

这里我们可以看到,此处注入所有带了注解@LoadBalanced的RestTemplate类。

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

这里有一点须知,@LoadBalanced注解实际上什么都没有,就是一个@Qualifier,而这个@Qualifier可以认为是一个标记,标识只将带有该标记的类注入进来。

    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
            final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
    }
    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
    }

这里我们可以看到,这个类就是给这些RestTmplate添加了LoadBalancerInterceptor拦截器。

@Override
        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }

而这里RestTmplate.getForObject方法最终会进入到execute这个方法,所有最终会到LoadBalancerInterceptor类,我们继续看LoadBalancerInterceptor.intercept方法

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }

此处loadBalancer就是RibbonLoadBalancerClient,他来自于RibbonAutoConfiguration中的装配,serviceName就是调用的服务命,这里就是主机地址,也就是ribbon-server

    @Bean
    @ConditionalOnMissingBean(LoadBalancerClient.class)
    public LoadBalancerClient loadBalancerClient() {
        return new RibbonLoadBalancerClient(springClientFactory());
    }

继续进入RibbonLoadBalancerClient.execute(String serviceId, LoadBalancerRequest<T> request)

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        #获取负载均衡器
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

最终来到

    @Override
    public <C> C getInstance(String name, Class<C> type) {
        C instance = super.getInstance(name, type);
        if (instance != null) {
            return instance;
        }
        IClientConfig config = getInstance(name, IClientConfig.class);
        return instantiateWithConfig(getContext(name), type, config);
    }

    public <T> T getInstance(String name, Class<T> type) {
        AnnotationConfigApplicationContext context = getContext(name);
        if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
                type).length > 0) {
            return context.getBean(type);
        }
        return null;
    }

    protected AnnotationConfigApplicationContext getContext(String name) {
        if (!this.contexts.containsKey(name)) {
            synchronized (this.contexts) {
                if (!this.contexts.containsKey(name)) {
                    this.contexts.put(name, createContext(name));
                }
            }
        }
        return this.contexts.get(name);
    }

    protected AnnotationConfigApplicationContext createContext(String name) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        if (this.configurations.containsKey(name)) {
            for (Class<?> configuration : this.configurations.get(name)
                    .getConfiguration()) {
                context.register(configuration);
            }
        }
        for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
            if (entry.getKey().startsWith("default.")) {
                for (Class<?> configuration : entry.getValue().getConfiguration()) {
                    context.register(configuration);
                }
            }
        }
        context.register(PropertyPlaceholderAutoConfiguration.class,
                this.defaultConfigType);
        context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
                this.propertySourceName,
                Collections.<String, Object> singletonMap(this.propertyName, name)));
        if (this.parent != null) {
            // Uses Environment from parent as well as beans
            context.setParent(this.parent);
        }
        context.setDisplayName(generateDisplayName(name));
        context.refresh();
        return context;
    }

这里可以先明确一下三个字段的值,defaultConfigType,propertySourceName,propertyName分别为RibbonClientConfiguration.class,ribbon,ribbon.client.name

public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {

    static final String NAMESPACE = "ribbon";

    public SpringClientFactory() {
        super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
    }
}

    public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName,
            String propertyName) {
        this.defaultConfigType = defaultConfigType;
        this.propertySourceName = propertySourceName;
        this.propertyName = propertyName;
    }

而createContext方法做的主要几个事就是,在spring容器中装配了两个类PropertyPlaceholderAutoConfiguration,RibbonClientConfiguration,并且往spring环境中加了一个配置ribbon.client.name=ribbon-server。
为什么要这样做呢,在来看下RibbonClientConfiguration这个类

@Configuration
@EnableConfigurationProperties
//Order is important here, last should be the default, first should be optional
// see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
public class RibbonClientConfiguration {

    public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
    public static final int DEFAULT_READ_TIMEOUT = 1000;

    @RibbonClientName
    private String name = "client";

    // TODO: maybe re-instate autowired load balancers: identified by name they could be
    // associated with ribbon clients

    @Autowired
    private PropertiesFactory propertiesFactory;

    @Bean
    @ConditionalOnMissingBean
    public IClientConfig ribbonClientConfig() {
        DefaultClientConfigImpl config = new DefaultClientConfigImpl();
        config.loadProperties(this.name);
        config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
        config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
        return config;
    }

    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, name)) {
            return this.propertiesFactory.get(IPing.class, config, name);
        }
        return new DummyPing();
    }

    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerList<Server> ribbonServerList(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerList.class, name)) {
            return this.propertiesFactory.get(ServerList.class, config, name);
        }
        ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
        serverList.initWithNiwsConfig(config);
        return serverList;
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
        return new PollingServerListUpdater(config);
    }

    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
            return this.propertiesFactory.get(ServerListFilter.class, config, name);
        }
        ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
        filter.initWithNiwsConfig(config);
        return filter;
    }

    @Bean
    @ConditionalOnMissingBean
    public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
                                                               IClientConfig config, RetryHandler retryHandler) {
        return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
    }

    @Bean
    @ConditionalOnMissingBean
    public RetryHandler retryHandler(IClientConfig config) {
        return new DefaultLoadBalancerRetryHandler(config);
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerIntrospector serverIntrospector() {
        return new DefaultServerIntrospector();
    }

    @PostConstruct
    public void preprocess() {
        setRibbonProperty(name, DeploymentContextBasedVipAddresses.key(), name);
    }

    static class OverrideRestClient extends RestClient {

        private IClientConfig config;
        private ServerIntrospector serverIntrospector;

        protected OverrideRestClient(IClientConfig config,
                ServerIntrospector serverIntrospector) {
            super();
            this.config = config;
            this.serverIntrospector = serverIntrospector;
            initWithNiwsConfig(this.config);
        }

        @Override
        public URI reconstructURIWithServer(Server server, URI original) {
            URI uri = updateToSecureConnectionIfNeeded(original, this.config,
                    this.serverIntrospector, server);
            return super.reconstructURIWithServer(server, uri);
        }

        @Override
        protected Client apacheHttpClientSpecificInitialization() {
            ApacheHttpClient4 apache = (ApacheHttpClient4) super.apacheHttpClientSpecificInitialization();
            apache.getClientHandler().getHttpClient().getParams().setParameter(
                    ClientPNames.COOKIE_POLICY, CookiePolicy.IGNORE_COOKIES);
            return apache;
        }

    }

}

原来他是将RibbonClientConfiguration注册到容器中为之后容器能加在该类中的bean,而且我们看到一个name,而这个name就是通过之前注入的属性ribbon.client.name=ribbon-server最后注入到该字段上

@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
        ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Value("${ribbon.client.name}")
public @interface RibbonClientName {

}

并且ZoneAwareLoadBalancer初始化的时候,就把我们配置的ribbon-server.ribbon.listOfServers封装了成Server对象

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.isSecure = false;
        this.useTunnel = false;
        this.serverListUpdateInProgress = new AtomicBoolean(false);
        this.updateAction = new NamelessClass_1();
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter)filter).setLoadBalancerStats(this.getLoadBalancerStats());
        }

        this.restOfInit(clientConfig);
    }

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        this.setEnablePrimingConnections(false);
        this.enableAndInitLearnNewServersFeature();
        this.updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections().primeConnections(this.getReachableServers());
        }

        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList();
        if (this.serverListImpl != null) {
            servers = this.serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
            if (this.filter != null) {
                servers = this.filter.getFilteredListOfServers((List)servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
            }
        }

        this.updateAllServerList((List)servers);
    }

到这里,ILoadBalancer loadBalancer = getLoadBalancer(serviceId)此处的ILoadBalancer就是ZoneAwareLoadBalancer


    public Server chooseServer(Object key) {
        if (this.counter == null) {
            this.counter = this.createCounter();
        }

        this.counter.increment();
        if (this.rule == null) {
            return null;
        } else {
            try {
                return this.rule.choose(key);
            } catch (Exception var3) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", new Object[]{this.name, key, var3});
                return null;
            }
        }
    }

最终又会调用Irule路由规则获取到一个具体的Server,而IRule如果没有配置就是默认的RoundRobinRule轮询策略。

    @Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

最终又会调用request.apply,此时的request则是我们之前传进来的

    public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
            final byte[] body, final ClientHttpRequestExecution execution) {
        return instance -> {
            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
            if (transformers != null) {
                for (LoadBalancerRequestTransformer transformer : transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);
        };
    }

ServiceRequestWrapper会将服务名替换成具体的ip然后执行http请求

    @Override
    public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) {
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
        } else {
            server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);
    }
上一篇下一篇

猜你喜欢

热点阅读