Spring Cloud

Spring Cloud Ribbon 分析(四)之Feign集

2021-02-01  本文已影响0人  Blog

前三节分析,我们主要分析和总结了Ribbon结合RestTemplate的使用,本节我们主要分析Ribbon结合Feign客户端的使用,毕竟当下使用Feign非常广泛,那么下面我们就分析下Feign客户端是如何使用Ribbon负载的,以下分析需要依赖OpenFeign依赖库模块


FeignRibbonClientAutoConfiguration配置文件

@ConditionalOnClass({ ILoadBalancer.class, Feign.class })
@Configuration
@AutoConfigureBefore(FeignAutoConfiguration.class)
@EnableConfigurationProperties({ FeignHttpClientProperties.class })
@Import({ HttpClientFeignLoadBalancedConfiguration.class,
        OkHttpFeignLoadBalancedConfiguration.class,
        DefaultFeignLoadBalancedConfiguration.class })
public class FeignRibbonClientAutoConfiguration {

    @Bean
    @Primary
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    public CachingSpringLoadBalancerFactory cachingLBClientFactory(
            SpringClientFactory factory) {
        return new CachingSpringLoadBalancerFactory(factory);
    }

    @Bean
    @Primary
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    public CachingSpringLoadBalancerFactory retryabeCachingLBClientFactory(
        SpringClientFactory factory,
        LoadBalancedRetryFactory retryFactory) {
        return new CachingSpringLoadBalancerFactory(factory, retryFactory);
    }

    //设置默认请求选项,从Ribbon的SpringClientFactory获取IClientConfig配置参数
    @Bean
    @ConditionalOnMissingBean
    public Request.Options feignRequestOptions() {
        return LoadBalancerFeignClient.DEFAULT_OPTIONS;
    }
}

当前配置类我们关注下CachingSpringLoadBalancerFactory这个工厂类,主要职责是获取FeignLoadBalancer并缓存提供给LoadBalancerFeignClient使用,LoadBalancerFeignClient是Feign负载均衡客户端的默认实现


CachingSpringLoadBalancerFactory工厂类

public class CachingSpringLoadBalancerFactory {

    private final SpringClientFactory factory;
    private LoadBalancedRetryFactory loadBalancedRetryFactory = null;

    private volatile Map<String, FeignLoadBalancer> cache = new ConcurrentReferenceHashMap<>();

    public CachingSpringLoadBalancerFactory(SpringClientFactory factory) {
        this.factory = factory;
    }

    public CachingSpringLoadBalancerFactory(SpringClientFactory factory, LoadBalancedRetryFactory loadBalancedRetryPolicyFactory) {
        this.factory = factory;
        this.loadBalancedRetryFactory = loadBalancedRetryPolicyFactory;
    }

    public FeignLoadBalancer create(String clientName) {
        FeignLoadBalancer client = this.cache.get(clientName);
        if(client != null) {
            return client;
        }
        IClientConfig config = this.factory.getClientConfig(clientName);
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
        client = loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
            loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }
}

代码片段中我们着重关注下SpringClientFactory对象,这个对象在分析一中已经讲述过,通过SpringClientFactory获取Ribbon管理配置接口(IClientConfig)与Ribbon负载均衡接口(ILoadBalancer)最终生成Feign负载均衡实现类(FeignLoadBalancer),最终提供给LoadBalancerFeignClient使用,LoadBalancerFeignClient内部会调用到ILoadBalancer.chooseServer达到负载均衡的效果,整个调用链会在下文阐述


DefaultFeignLoadBalancedConfiguration

@Configuration
class DefaultFeignLoadBalancedConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
                              SpringClientFactory clientFactory) {
        return new LoadBalancerFeignClient(new Client.Default(null, null),
                cachingFactory, clientFactory);
    }
}

通过FeignRibbonClientAutoConfiguration的@Import({DefaultFeignLoadBalancedConfiguration.class.class})进行加载,默认的Feign客户端接口实现类并且有负载均衡的效果


FeignAutoConfiguration

@Configuration
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({FeignClientProperties.class, FeignHttpClientProperties.class})
public class FeignAutoConfiguration {
@Autowired(required = false)
    private List<FeignClientSpecification> configurations = new ArrayList<>();

    @Bean
    public HasFeatures feignFeature() {
        return HasFeatures.namedFeature("Feign", Feign.class);
    }

    @Bean
    public FeignContext feignContext() {
        FeignContext context = new FeignContext();
        context.setConfigurations(this.configurations);
        return context;
    }

    @Configuration
    @ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
    protected static class HystrixFeignTargeterConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public Targeter feignTargeter() {
            return new HystrixTargeter();
        }
    }
    ......
}

在FeignRibbonClientAutoConfiguration之后被装配,这个配置类我们只大概讲述下HystrixFeignTargeterConfiguration配置类下的Targeter,这个的作用就是生成一个Feign客户端代理类,这个代理类会实现Hystrix熔断的功能,然后调用到最终的LoadBalancerFeignClient进行请求,整个生成Feign代理类的调用链为@EnableFeignClients->FeignClientsRegistrar.registerBeanDefinitions->FeignClientsRegistrar.registerFeignClient->FeignClientFactoryBean.getObject->FeignClientFactoryBean.loadBalance->HystrixTargeter.target->Feign.target->HystrixFeign#build->ReflectiveFeign.newInstance->HystrixInvocationHandler.invoke->SynchronousMethodHandler.invoke对于整个调用链本节暂不过多讲述,感兴趣可以通过堆栈进行跟踪,具体分析会放到单独讲述Feign相关的分析进行澄清


LoadBalancerFeignClient负载均衡实现类

public class LoadBalancerFeignClient implements Client {

    static final Request.Options DEFAULT_OPTIONS = new Request.Options();

    private final Client delegate;
    private CachingSpringLoadBalancerFactory lbClientFactory;
    private SpringClientFactory clientFactory;

    ......

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            URI asUri = URI.create(request.url());
            String clientName = asUri.getHost();
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                    this.delegate, request, uriWithoutHost);

            IClientConfig requestConfig = getClientConfig(options, clientName);
            return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                    requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }

    IClientConfig getClientConfig(Request.Options options, String clientName) {
        IClientConfig requestConfig;
        if (options == DEFAULT_OPTIONS) {
            requestConfig = this.clientFactory.getClientConfig(clientName);
        } else {
            requestConfig = new FeignOptionsClientConfig(options);
        }
        return requestConfig;
    }

    ......

    private FeignLoadBalancer lbClient(String clientName) {
        return this.lbClientFactory.create(clientName);
    }

    ......
}

通过上文我们得知最终生成的Feign代理类会调用到LoadBalancerFeignClient的execute方法,我们主要看executeWithLoadBalancer这个方法,通过负载均衡获取结果

/**
* 当调用者想要通过负载均衡分发请求到一个服务器时候可以使用这个方法,
* 通过recostructuriWithServer计算最终的URI,而不是在请求URI中指定服务器
*/
public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {
    ......

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        //构建LoadBalancerCommand命令行
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            //提交命令请求,并返回一个RxJava的被观察者对象,通过阻塞方式获取最终的结果
            return command.submit(
                new ServerOperation<T>() {
                    //通过LoadBalancerContext.getServerFromLoadBalancer获取负载均衡的Server,
                    //通过Server svc = lb.chooseServer(loadBalancerKey);获取负载均衡Server,
                    //如何负载均衡获取Server可以查看[分析三](https://www.jianshu.com/p/f076ab3e4031)
                    @Override
                    public Observable<T> call(Server server) {
                        //计算最终的URI
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            //执行RetryableFeignLoadBalancer.execute方法
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                //阻塞BlockingObservable
                .toBlocking()
                //返回结果
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
    }
}

通过注释我们得知时候通过阻塞方式获取结果,下文会大致分析调用流程


LoadBalancerCommand命令类

public class LoadBalancerCommand<T> {
    ......
    //创建被观察者
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    //通过负载均衡方法获取Server
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); 
                    //发射数据
                    next.onNext(server);
                    //结束当前被观察者
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
    ......
    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    public Observable<T> call(Server server) {
                        context.setServer(server);
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
                        
                        // Called for each attempt and retry
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        context.incAttemptCount();
                                        loadBalancerContext.noteOpenConnection(stats);
                                        
                                        if (listenerInvoker != null) {
                                            try {
                                                listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                            } catch (AbortExecutionException e) {
                                                return Observable.error(e);
                                            }
                                        }
                                        
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                        //执行return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            @Override
                                            public void onCompleted() {
                                                recordStats(tracer, stats, entity, null);
                                                // TODO: What to do if onNext or onError are never called?
                                            }

                                            @Override
                                            public void onError(Throwable e) {
                                                recordStats(tracer, stats, null, e);
                                                logger.debug("Got error {} when executed on server {}", e, server);
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                }
                                            }

                                            @Override
                                            public void onNext(T entity) {
                                                this.entity = entity;
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                }
                                            }                            
                                            
                                            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                tracer.stop();
                                                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                            }
                                        });
                                    }
                                });
                        
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
            
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        //发生错误时继续执行下一个
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }
}

我们看见LoadBalancerCommand. submit方法内部,最终执行return operation.call(server)->RetryableFeignLoadBalancer.execute从下往上返回Observable<T>返回值,最终返回给最外层,最外层获取得到Observable<T>然后.toBlocking().single()通过内部阻塞方式获取最终请求的返回结果,这个请求方式涉及RxJava的被观察者与观察者模式,所以更多RxJava使用方法请参考RxJava相关资料


BlockingObservable

public final class BlockingObservable<T> {
    ......
    private T blockForSingle(final Observable<? extends T> observable) {
        final AtomicReference<T> returnItem = new AtomicReference<T>();
        final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
        //线程等待其他线程各自执行完毕后再执行
        final CountDownLatch latch = new CountDownLatch(1);

        @SuppressWarnings("unchecked")
        //观察者,订阅的数据为LoadBalancerCommand. selectServer返回的,调用subscribe开启订阅,被观察者开始执行call回调
        Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
            @Override
            public void onCompleted() {
                latch.countDown();
            }

            @Override
            public void onError(final Throwable e) {
                returnException.set(e);
                latch.countDown();
            }

            @Override
            public void onNext(final T item) {
                returnItem.set(item);
            }
        });
        //阻塞等待,直到latch计数器的值为0
        BlockingUtils.awaitForComplete(latch, subscription);

        if (returnException.get() != null) {
            Exceptions.propagate(returnException.get());
        }
        //返回最终数据
        return returnItem.get();
    }
}

RetryableFeignLoadBalancer

public class RetryableFeignLoadBalancer extends FeignLoadBalancer implements ServiceInstanceChooser {
    @Override
    public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride)
            throws IOException {
        
        ......

        return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() {
            @Override
            public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException {
                Request feignRequest = null;
                if (retryContext instanceof LoadBalancedRetryContext) {
                    ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance();
                    if (service != null) {
                        feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest();
                    }
                }
                if (feignRequest == null) {
                    feignRequest = request.toRequest();
                }
                Response response = request.client().execute(feignRequest, options);
                if (retryPolicy.retryableStatusCode(response.status())) {
                    byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream());
                    response.close();
                    throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response,
                            byteArray, request.getUri());
                }
                //返回最终的结果
                return new RibbonResponse(request.getUri(), response);
            }
        }, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() {
            @Override
            protected RibbonResponse createResponse(Response response, URI uri) {
                return new RibbonResponse(uri, response);
            }
        });
    }

}

通过return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));请求就得到了最终的请求结果


经过四节分析,总结了RestTemplate与Feign在使用Ribbon时候如何进行负载均衡相关的知识,后续会继续分析和总结Hystrix、Feign、Zuul等相关组件!

上一篇 下一篇

猜你喜欢

热点阅读