Java技术升华面试精选Springcloud

Spring Cloud——OpenFeign源码解析

2021-09-12  本文已影响0人  小波同学

一、简介

Feign是一个http请求调用的轻量级框架,可以以Java接口注解的方式调用Http请求,而不用像Java中通过封装HTTP请求报文的方式直接调用。Feign通过处理注解,将请求模板化,当实际调用的时候,传入参数,根据参数再应用到请求上,进而转化成真正的请求,这种请求相对而言比较直观。

Feign被广泛应用在Spring Cloud 的解决方案中,是学习基于Spring Cloud 微服务架构不可或缺的重要组件。

二、工作原理

三、源码分析

使用openfegin我们可以不用在yaml文件添加任何关于openfeign的配置,而只需要在一个被@Configuration注释的配置类上或者Application启动类上添加@EnableFeignClients注解。例如:

@EnableFeignClients(basePackages = {"com.yibo.order.center"})
public class OrderCenterApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderCenterApplication {
.class,args);
    }
}

basePackages属性用于指定被@FeignClient注解注释的接口所在的包的包名,或者也可以直接指定clients属性,clients属性可以直接指定一个或多个被@FeignClient注释的类。

basePackages是一个数组,如果被@FeignClient注解注释的接口比较分散,可以指定多个包名,而不使用一个大的包名,这样可以减少包扫描耗费的时间,不拖慢应用的启动速度。

EnableFeignClients

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {

    //basePackages 属性的别名,允许使用更简洁的注释声明
    String[] value() default {};

    //扫描包下带注释的组件
    String[] basePackages() default {};

    //basePackages() 的类型安全的替代方法,用于指定要扫描带注释的组件的软件包,指定类别的包装将被扫描。
    Class<?>[] basePackageClasses() default {};

    //适用于所有自定义@Configuration,可以包含组成客户端的部分的@Bean 
    Class<?>[] defaultConfiguration() default {};

    //用@FeignClient注释的类的列表,如果不为空,则禁用类路径*扫描。
    Class<?>[] clients() default {};

}

FeignClientsRegistrar实现了ImportBeanDefinitionRegistrar,它是一个动态注入bean的接口,Spring Boot启动的时候,会去调用这个类中的registerBeanDefinitions来实现动态Bean的装载。它的作用类似于ImportSelector。

FeignClientsRegistrar

@EnableFeignClients注解使用@Import导入FeignClientsRegistrar类,这是一个ImportBeanDefinitionRegistrar,因此我们重点关注它的registerBeanDefinitions方法。

class FeignClientsRegistrar
        implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
        
    @Override
    public void registerBeanDefinitions(AnnotationMetadata metadata,
            BeanDefinitionRegistry registry) {
        //注册@EnableFeignClients中定义defaultConfiguration属性下的类,包装成FeignClientSpecification,注册到Spring容器。
    //在@FeignClient中有一个属性:configuration,这个属性是表示各个FeignClient自定义的配置类,
        //后面也会通过调用registerClientConfiguration方法来注册成FeignClientSpecification到容器。
    //所以,这里可以完全理解在@EnableFeignClients中配置的是做为兜底的配置,在各个@FeignClient配置的就是自定义的情况。   
        registerDefaultConfiguration(metadata, registry);
        
        
        //该方法负责读取@EnableFeignClients的属性,获取需要扫描的包名,
        //然后扫描指定的所有包名下的被@FeignClient注解注释的接口,
        //将扫描出来的接口调用registerFeignClient方法注册到spring容器。
        registerFeignClients(metadata, registry);
    }
}       

registerDefaultConfiguration

class FeignClientsRegistrar
        implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {

    private void registerDefaultConfiguration(AnnotationMetadata metadata,
            BeanDefinitionRegistry registry) {
        //解析EnableFeignClients属性
        Map<String, Object> defaultAttrs = metadata
                .getAnnotationAttributes(EnableFeignClients.class.getName(), true);

        if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
            String name;
            if (metadata.hasEnclosingClass()) {
                name = "default." + metadata.getEnclosingClassName();
            }
            else {
                name = "default." + metadata.getClassName();
            }
            //注册客户端配置
            registerClientConfiguration(registry, name,
                    defaultAttrs.get("defaultConfiguration"));
        }
    }
    
    private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name,
            Object configuration) {
        //加载FeignClientSpecification bean   
        BeanDefinitionBuilder builder = BeanDefinitionBuilder
                .genericBeanDefinition(FeignClientSpecification.class);
        builder.addConstructorArgValue(name);
        builder.addConstructorArgValue(configuration);
        //注册FeignClientSpecification类型的Bean
        registry.registerBeanDefinition(
                name + "." + FeignClientSpecification.class.getSimpleName(),
                builder.getBeanDefinition());
    }
}

registerFeignClients

重点关注registerFeignClients方法,该方法负责读取@EnableFeignClients的属性,获取需要扫描的包名,然后扫描指定的所有包名下的被@FeignClient注解注释的接口,将扫描出来的接口调用registerFeignClient方法注册到spring容器。

class FeignClientsRegistrar
        implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
        
    public void registerFeignClients(AnnotationMetadata metadata,
            BeanDefinitionRegistry registry) {
        ClassPathScanningCandidateComponentProvider scanner = getScanner();
        scanner.setResourceLoader(this.resourceLoader);

        Set<String> basePackages;

        Map<String, Object> attrs = metadata
                .getAnnotationAttributes(EnableFeignClients.class.getName());
        // 扫描带有FeignClient注解的类      
        AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(
                FeignClient.class);
        //获取@EnableFeignClients 中clients的值      
        final Class<?>[] clients = attrs == null ? null
                : (Class<?>[]) attrs.get("clients");
        if (clients == null || clients.length == 0) {
            //如果没有设置,那么设置类型过滤器
            scanner.addIncludeFilter(annotationTypeFilter);
            // 如果没有设置,则扫描的包路径为 @EnableFeignClients 注解所在的包
            basePackages = getBasePackages(metadata);
        }
        else {
            final Set<String> clientClasses = new HashSet<>();
            basePackages = new HashSet<>();
            //设置了则使用注解属性来进行扫描注册
            for (Class<?> clazz : clients) {
                basePackages.add(ClassUtils.getPackageName(clazz));
                clientClasses.add(clazz.getCanonicalName());
            }
            AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() {
                @Override
                protected boolean match(ClassMetadata metadata) {
                    String cleaned = metadata.getClassName().replaceAll("\\$", ".");
                    return clientClasses.contains(cleaned);
                }
            };
            scanner.addIncludeFilter(
                    new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter)));
        }
        //循环扫描注册
        for (String basePackage : basePackages) {
            Set<BeanDefinition> candidateComponents = scanner
                    .findCandidateComponents(basePackage);
            for (BeanDefinition candidateComponent : candidateComponents) {
                if (candidateComponent instanceof AnnotatedBeanDefinition) {
                    // verify annotated class is an interface
                    // 验证带注释的类必须是接口
                    AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
                    AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
                    Assert.isTrue(annotationMetadata.isInterface(),
                            "@FeignClient can only be specified on an interface");

                    Map<String, Object> attributes = annotationMetadata
                            .getAnnotationAttributes(
                                    FeignClient.class.getCanonicalName());

                    String name = getClientName(attributes);
                    //注册被调用客户端配置
                    //注册(微服务名).FeignClientSpecification类型的bean
                    //beanname: order-center.FeignClientSpecification
                    registerClientConfiguration(registry, name,
                            attributes.get("configuration"));
                     //注册 FeignClient
                    registerFeignClient(registry, annotationMetadata, attributes);
                }
            }
        }
    }
}

registerFeignClient

class FeignClientsRegistrar
        implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
        
    private void registerFeignClient(BeanDefinitionRegistry registry,
            AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
        String className = annotationMetadata.getClassName();
        //构建FeignClientFactoryBean类型的BeanDefinitionBuilder
        BeanDefinitionBuilder definition = BeanDefinitionBuilder
                .genericBeanDefinition(FeignClientFactoryBean.class);
        validate(attributes);
        //将属性设置到 FeignClientFactoryBean 中
        definition.addPropertyValue("url", getUrl(attributes));
        definition.addPropertyValue("path", getPath(attributes));
        String name = getName(attributes);
        definition.addPropertyValue("name", name);
        String contextId = getContextId(attributes);
        definition.addPropertyValue("contextId", contextId);
        definition.addPropertyValue("type", className);
        definition.addPropertyValue("decode404", attributes.get("decode404"));
        definition.addPropertyValue("fallback", attributes.get("fallback"));
        definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory"));
        //设置 Autowire注入的类型,按类型注入
        definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);

        String alias = contextId + "FeignClient";
        AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
        beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);

        // has a default, won't be null
        boolean primary = (Boolean) attributes.get("primary");

        beanDefinition.setPrimary(primary);

        String qualifier = getQualifier(attributes);
        if (StringUtils.hasText(qualifier)) {
            alias = qualifier;
        }
        //将BeanDefinition包装成BeanDefinitionHolder,用于注册
        BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                new String[] { alias });
        //注册 BeanDefinition     
        BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    }
}   

至此,注册完成,注意此时仅仅只是注册到 DefaultListableBeanFactory容器的 beanDefinitionMap中,并没有实例化!

实例化

getObjectForBeanInstance

在doGetBean方法中会调用getObjectForBeanInstance方法获取beanName对应的实例对象(主要用于FactoryBean的特殊处理,普通Bean会直接返回sharedInstance本身)

public abstract class AbstractBeanFactory extends FactoryBeanRegistrySupport implements ConfigurableBeanFactory {

    protected Object getObjectForBeanInstance(
            Object beanInstance, String name, String beanName, @Nullable RootBeanDefinition mbd) {

        // Don't let calling code try to dereference the factory if the bean isn't a factory.
        if (BeanFactoryUtils.isFactoryDereference(name)) {
            if (beanInstance instanceof NullBean) {
                return beanInstance;
            }
            // 1.如果name以“&”为前缀,但是beanInstance不是FactoryBean,则抛异常
            if (!(beanInstance instanceof FactoryBean)) {
                throw new BeanIsNotAFactoryException(beanName, beanInstance.getClass());
            }
            if (mbd != null) {
                mbd.isFactoryBean = true;
            }
            return beanInstance;
        }

        // Now we have the bean instance, which may be a normal bean or a FactoryBean.
        // If it's a FactoryBean, we use it to create a bean instance, unless the
        // caller actually wants a reference to the factory.
        // 2.1 如果beanInstance不是FactoryBean(也就是普通bean),则直接返回beanInstance
        if (!(beanInstance instanceof FactoryBean)) {
            return beanInstance;
        }
        //FactoryBean创建出bean实例返回
        // 3.走到这边,代表beanInstance是FactoryBean,但name不带有“&”前缀,表示想要获取的是FactoryBean创建的对象实例
        Object object = null;
        if (mbd != null) {
            mbd.isFactoryBean = true;
        }
        else {
            // 4.如果mbd为空,则尝试从factoryBeanObjectCache缓存中获取该FactoryBean创建的对象实例
            object = getCachedObjectForFactoryBean(beanName);
        }
        if (object == null) {
            // Return bean instance from factory.
            // 5.只有beanInstance是FactoryBean才能走到这边,因此直接强转
            FactoryBean<?> factory = (FactoryBean<?>) beanInstance;
            // Caches object obtained from FactoryBean if it is a singleton.
            if (mbd == null && containsBeanDefinition(beanName)) {
                // 6.mbd为空,但是该bean的BeanDefinition在缓存中存在,则获取该bean的MergedBeanDefinition
                mbd = getMergedLocalBeanDefinition(beanName);
            }
            // 7.mbd是否是合成的(这个字段比较复杂,mbd正常情况都不是合成的,也就是false,有兴趣的可以自己查阅资料看看)
            boolean synthetic = (mbd != null && mbd.isSynthetic());
            // 8.从FactoryBean获取对象实例
            object = getObjectFromFactoryBean(factory, beanName, !synthetic);
        }
        // 9.返回对象实例
        return object;
    }
}

getObjectFromFactoryBean

从FactoryBean获取对象实例
getObjectForBeanInstance方法中会调用getObjectFromFactoryBean从FactoryBean获取对象实例,即调用FactoryBean的getObject方法获取对象实例。

public abstract class FactoryBeanRegistrySupport extends DefaultSingletonBeanRegistry {

    protected Object getObjectFromFactoryBean(FactoryBean<?> factory, String beanName, boolean shouldPostProcess) {
        // 1.如果是单例,并且已经存在于单例对象缓存中
        if (factory.isSingleton() && containsSingleton(beanName)) {
            //又见双重检查锁机制,尝试再从缓存中获取,防止多线程下可能有别的线程已完成该单例Bean的创建
            synchronized (getSingletonMutex()) {
                // 2.从FactoryBean创建的单例对象的缓存中获取该bean实例
                Object object = this.factoryBeanObjectCache.get(beanName);
                if (object == null) {
                    // 3.调用FactoryBean的getObject方法获取对象实例
                    object = doGetObjectFromFactoryBean(factory, beanName);
                    // Only post-process and store if not put there already during getObject() call above
                    // (e.g. because of circular reference processing triggered by custom getBean calls)
                    Object alreadyThere = this.factoryBeanObjectCache.get(beanName);
                    // 4.如果该beanName已经在缓存中存在,则将object替换成缓存中的
                    if (alreadyThere != null) {
                        object = alreadyThere;
                    }
                    else {
                        if (shouldPostProcess) {
                            if (isSingletonCurrentlyInCreation(beanName)) {
                                // Temporarily return non-post-processed object, not storing it yet..
                                return object;
                            }
                            beforeSingletonCreation(beanName);
                            try {
                                // 5.对bean实例进行后置处理,执行所有已注册的BeanPostProcessor的postProcessAfterInitialization方法
                                //触发BeanPostProcessor,第三方框架可以在此用AOP来包装Bean实例
                                object = postProcessObjectFromFactoryBean(object, beanName);
                            }
                            catch (Throwable ex) {
                                throw new BeanCreationException(beanName,
                                        "Post-processing of FactoryBean's singleton object failed", ex);
                            }
                            finally {
                                //创建完成后,从缓存锁定的名字里清除
                                afterSingletonCreation(beanName);
                            }
                        }
                        if (containsSingleton(beanName)) {
                            // 6.将beanName和object放到factoryBeanObjectCache缓存中
                            this.factoryBeanObjectCache.put(beanName, object);
                        }
                    }
                }
                // 7.返回object对象实例
                return object;
            }
        }
        else {
            // 8.调用FactoryBean的getObject方法获取对象实例
            Object object = doGetObjectFromFactoryBean(factory, beanName);
            if (shouldPostProcess) {
                try {
                    // 9.对bean实例进行后置处理,执行所有已注册的BeanPostProcessor的postProcessAfterInitialization方法
                    object = postProcessObjectFromFactoryBean(object, beanName);
                }
                catch (Throwable ex) {
                    throw new BeanCreationException(beanName, "Post-processing of FactoryBean's object failed", ex);
                }
            }
            // 10.返回object对象实例
            return object;
        }
    }
}

主要步骤:

doGetObjectFromFactoryBean

用FactoryBean的getObject方法获取对象实例

public abstract class FactoryBeanRegistrySupport extends DefaultSingletonBeanRegistry {

    private Object doGetObjectFromFactoryBean(final FactoryBean<?> factory, final String beanName)
            throws BeanCreationException {

        Object object;
        try {
            // 1.调用FactoryBean的getObject方法获取bean对象实例
            if (System.getSecurityManager() != null) {
                AccessControlContext acc = getAccessControlContext();
                try {
                    // 1.1 带有权限验证的
                    object = AccessController.doPrivileged((PrivilegedExceptionAction<Object>) factory::getObject, acc);
                }
                catch (PrivilegedActionException pae) {
                    throw pae.getException();
                }
            }
            else {
                // 1.2 不带权限
                object = factory.getObject();
            }
        }
        catch (FactoryBeanNotInitializedException ex) {
            throw new BeanCurrentlyInCreationException(beanName, ex.toString());
        }
        catch (Throwable ex) {
            throw new BeanCreationException(beanName, "FactoryBean threw exception on object creation", ex);
        }

        // Do not accept a null value for a FactoryBean that's not fully
        // initialized yet: Many FactoryBeans just return null then.
        // 2.getObject返回的是空值,并且该FactoryBean正在初始化中,则直接抛异常,
        // 不接受一个尚未完全初始化的FactoryBean的getObject返回的空值
        if (object == null) {
            if (isSingletonCurrentlyInCreation(beanName)) {
                throw new BeanCurrentlyInCreationException(
                        beanName, "FactoryBean which is currently in creation returned null from getObject");
            }
            object = new NullBean();
        }
        // 3.返回创建好的bean对象实例
        return object;
    }
}

很简单的方法,就是直接调用 FactoryBean 的 getObject 方法来获取到对象实例。

细心的同学可以发现,该方法是以 do 开头,以 do 开头的方法是最终进行实际操作的方法,例如本方法就是 FactoryBean 最终实际进行创建 bean 对象实例的方法。

factory.getObject()

该方法会调用到FeignClientFactoryBean的getObject方法

class FeignClientFactoryBean
        implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
        
    @Override
    public Object getObject() throws Exception {
        return getTarget();
    }
}

getObject调用的是getTarget方法,它从applicationContext取出FeignContext,FeignContext继承了NamedContextFactory,它是用来统一维护feign中各个feign客户端相互隔离的上下文。

FeignContext注册到容器是在FeignAutoConfiguration上完成的。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Feign.class)
@EnableConfigurationProperties({ FeignClientProperties.class,
        FeignHttpClientProperties.class })
@Import(DefaultGzipDecoderConfiguration.class)
public class FeignAutoConfiguration {

    @Autowired(required = false)
    private List<FeignClientSpecification> configurations = new ArrayList<>();
    
    @Bean
    public FeignContext feignContext() {
        FeignContext context = new FeignContext();
        context.setConfigurations(this.configurations);
        return context;
    }
}   

在初始化FeignContext时,会把configurations在容器中放入FeignContext中。configurations 的来源就是在前面registerFeignClients方法中将@FeignClient的配置 configuration。

FeignClientFactoryBean#getTarget

class FeignClientFactoryBean
        implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
        
    <T> T getTarget() {
        //实例化Feign上下文对象FeignContext
        FeignContext context = applicationContext.getBean(FeignContext.class);
        //生成Builder对象,用来生成Feign
        Feign.Builder builder = feign(context);
        
        //如果url为空,则走负载均衡,生成有负载均衡功能的代理类
        if (!StringUtils.hasText(url)) {
            if (!name.startsWith("http")) {
                url = "http://" + name;
            }
            else {
                url = name;
            }
            url += cleanPath();
            //@FeignClient没有配置url属性,返回有负载均衡功能的代理对象
            return (T) loadBalance(builder, context,
                    new HardCodedTarget<>(type, name, url));
        }
        //如果指定了url,则生成默认的代理类
        if (StringUtils.hasText(url) && !url.startsWith("http")) {
            url = "http://" + url;
        }
        String url = this.url + cleanPath();
        Client client = getOptional(context, Client.class);
        if (client != null) {
            if (client instanceof LoadBalancerFeignClient) {
                // not load balancing because we have a url,
                // but ribbon is on the classpath, so unwrap
                client = ((LoadBalancerFeignClient) client).getDelegate();
            }
            if (client instanceof FeignBlockingLoadBalancerClient) {
                // not load balancing because we have a url,
                // but Spring Cloud LoadBalancer is on the classpath, so unwrap
                client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
            }
            builder.client(client);
        }
        Targeter targeter = get(context, Targeter.class);
        
        //生成默认代理类
        return (T) targeter.target(this, builder, context,
                new HardCodedTarget<>(type, name, url));
    }
}

feign(context)

构建feign.builder,在构建时会向FeignContext获取配置的Encoder,Decoder等各种信息。FeignContext在上文中已经提到会为每个Feign客户端分配了一个容器,它们的父容器就是spring容器。

class FeignClientFactoryBean
        implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
        
    protected Feign.Builder feign(FeignContext context) {
        FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class);
        Logger logger = loggerFactory.create(type);

        // @formatter:off
        Feign.Builder builder = get(context, Feign.Builder.class)
                // required values
                .logger(logger) //日志
                .encoder(get(context, Encoder.class))   //编码器
                .decoder(get(context, Decoder.class))   //解码器
                .contract(get(context, Contract.class));//验证器
        // @formatter:on
        //处理了链接超时、读取超时等配置项
        configureFeign(context, builder);

        return builder;
    }
}

FeignClientFactoryBean#loadBalance

class FeignClientFactoryBean
        implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
        
    protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
            HardCodedTarget<T> target) {
        Client client = getOptional(context, Client.class);
        if (client != null) {
            builder.client(client);
            Targeter targeter = get(context, Targeter.class);
            return targeter.target(this, builder, context, target);
        }

        throw new IllegalStateException(
                "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
    }
}   

FeignClientFactoryBean#getOptional(context, Client.class)

Client client = (Client)this.getOptional(context, Client.class); 从上下文中获取一个 Client,默认是LoadBalancerFeignClient。它是在FeignRibbonClientAutoConfiguration这个自动装配类中,通过Import实现的

@Import({ HttpClientFeignLoadBalancedConfiguration.class,
        OkHttpFeignLoadBalancedConfiguration.class,
        DefaultFeignLoadBalancedConfiguration.class })
public class FeignRibbonClientAutoConfiguration {
    //......
}

DefaultFeignLoadBalancedConfiguration

这里的通过 DefaultFeignLoadBalancedConfiguration 注入客户端 Client 的实现

@Configuration(proxyBeanMethods = false)
class DefaultFeignLoadBalancedConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
            SpringClientFactory clientFactory) {
        return new LoadBalancerFeignClient(new Client.Default(null, null), cachingFactory,
                clientFactory);
    }

}

targeter.target

接下去进入targeter.target(this, builder, context, target) ,携带着构建好的这些对象去创建代理实例 ,这里有两个实现 HystrixTargeter 、DefaultTargeter 很显然,我们没有配置 Hystrix ,这里会走 DefaultTargeter

class DefaultTargeter implements Targeter {

    @Override
    public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
            FeignContext context, Target.HardCodedTarget<T> target) {
        return feign.target(target);
    }

}

feign.target

然后会来到 feign.Feign.Builder#target(feign.Target<T>)

public abstract class Feign {

    public <T> T target(Target<T> target) {
        return build().newInstance(target);
    }
    
    public Feign build() {
        Client client = Capability.enrich(this.client, capabilities);
        Retryer retryer = Capability.enrich(this.retryer, capabilities);
        List<RequestInterceptor> requestInterceptors = this.requestInterceptors.stream()
                .map(ri -> Capability.enrich(ri, capabilities))
                .collect(Collectors.toList());
        Logger logger = Capability.enrich(this.logger, capabilities);
        Contract contract = Capability.enrich(this.contract, capabilities);
        Options options = Capability.enrich(this.options, capabilities);
        Encoder encoder = Capability.enrich(this.encoder, capabilities);
        Decoder decoder = Capability.enrich(this.decoder, capabilities);
        InvocationHandlerFactory invocationHandlerFactory =
                Capability.enrich(this.invocationHandlerFactory, capabilities);
        QueryMapEncoder queryMapEncoder = Capability.enrich(this.queryMapEncoder, capabilities);

        SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
                new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
                logLevel, decode404, closeAfterDecode, propagationPolicy, forceDecoding);
        ParseHandlersByName handlersByName =
                new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
                errorDecoder, synchronousMethodHandlerFactory);
        return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
    }
}

ReflectiveFeign.newInstance

最终会调用 ReflectiveFeign.newInstance

这个方法是用来创建一个动态代理的方法,在生成动态代理之前,会根据Contract协议(协议解析规则,解析接口类的注解信息,解析成内部的MethodHandler的处理方式。

从实现的代码中可以看到熟悉的Proxy.newProxyInstance方法产生代理类。而这里需要对每个定义的接口方法进行特定的处理实现,所以这里会出现一个MethodHandler的概念,就是对应方法级别的InvocationHandler。

public class ReflectiveFeign extends Feign {

    @Override
    public <T> T newInstance(Target<T> target) {
        // 解析接口注解信息
        //根据接口类和Contract协议解析方式,解析接口类上的方法和注解,转换成内部的MethodHandler处理方式
        Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
        Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
        List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
        
        // 根据方法类型
        for (Method method : target.type().getMethods()) {
            if (method.getDeclaringClass() == Object.class) {
                continue;
            } else if (Util.isDefault(method)) {
                DefaultMethodHandler handler = new DefaultMethodHandler(method);
                defaultMethodHandlers.add(handler);
                methodToHandler.put(method, handler);
            } else {
                methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
            }
        }
        InvocationHandler handler = factory.create(target, methodToHandler);
        
        // 基于Proxy.newProxyInstance 为接口类创建动态实现,将所有的请求转换给InvocationHandler 处理。
        T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
                new Class<?>[] {target.type()}, handler);

        for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
            defaultMethodHandler.bindTo(proxy);
        }
        return proxy;
    }
}

targetToHandlersByName.apply

targetToHandlersByName.apply(target) :根据Contract协议规则,解析接口类的注解信息,解析成内部表现:targetToHandlersByName.apply(target);会解析接口方法上的注解,从而解析出方法粒度的特定的配置信息,然后生产一个SynchronousMethodHandler 然后需要维护一个<method,MethodHandler>的map,放入InvocationHandler的实现FeignInvocationHandler中。

public class ReflectiveFeign extends Feign {

    static final class ParseHandlersByName {
        public Map<String, MethodHandler> apply(Target target) {
            List<MethodMetadata> metadata = contract.parseAndValidateMetadata(target.type());
            Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>();
            for (MethodMetadata md : metadata) {
                BuildTemplateByResolvingArgs buildTemplate;
                if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null) {
                buildTemplate =
                        new BuildFormEncodedTemplateFromArgs(md, encoder, queryMapEncoder, target);
                } else if (md.bodyIndex() != null) {
                    buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder, queryMapEncoder, target);
                } else {
                    buildTemplate = new BuildTemplateByResolvingArgs(md, queryMapEncoder, target);
                }
                if (md.isIgnored()) {
                    result.put(md.configKey(), args -> {
                        throw new IllegalStateException(md.configKey() + " is not a method handled by feign");
                    });
                } else {
                    result.put(md.configKey(),
                        factory.create(target, md, buildTemplate, options, decoder, errorDecoder));
                }
            }
            return result;
        }
    }
}  

SpringMvcContract:当前Spring Cloud 微服务解决方案中,为了降低学习成本,采用了Spring MVC的部分注解来完成 请求协议解析,也就是说 ,写客户端请求接口和像写服务端代码一样:客户端和服务端可以通过SDK的方式进行约定,客户端只需要引入服务端发布的SDK API,就可以使用面向接口的编码方式对接服务。

OpenFeign调用过程 :

在前面的分析中,我们知道OpenFeign最终返回的是一个 ReflectiveFeign.FeignInvocationHandler 的对象。那么当客户端发起请求时,会进入到 FeignInvocationHandler.invoke 方法中,这个大家都知道,它是一个动态代理的实现。

public class ReflectiveFeign extends Feign {

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if ("equals".equals(method.getName())) {
            try {
                Object otherHandler =
                        args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
                return equals(otherHandler);
            } catch (IllegalArgumentException e) {
                return false;
            }
        } else if ("hashCode".equals(method.getName())) {
            return hashCode();
        } else if ("toString".equals(method.getName())) {
            return toString();
        }
        // 利用分发器筛选方法,找到对应的handler 进行处理
        return dispatch.get(method).invoke(args);
    }
}  

SynchronousMethodHandler#invoke

而接着,在invoke方法中,会调用 this.dispatch.get(method)).invoke(args) 。this.dispatch.get(method) 会返回一个SynchronousMethodHandler,进行拦截处理。这个方法会根据参数生成完成的RequestTemplate对象,这个对象是Http请求的模版,代码如下。

final class SynchronousMethodHandler implements MethodHandler {

    @Override
    public Object invoke(Object[] argv) throws Throwable {
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Options options = findOptions(argv);
        Retryer retryer = this.retryer.clone();
        while (true) {
            try {
                return executeAndDecode(template, options);
            } catch (RetryableException e) {
                try {
                    retryer.continueOrPropagate(e);
                } catch (RetryableException th) {
                    Throwable cause = th.getCause();
                    if (propagationPolicy == UNWRAP && cause != null) {
                        throw cause;
                    } else {
                        throw th;
                    }
                }
                if (logLevel != Logger.Level.NONE) {
                    logger.logRetry(metadata.configKey(), logLevel);
                }
                continue;
            }
        }
    }
}

SynchronousMethodHandler#executeAndDecode

经过上述的代码,我们已经将restTemplate拼装完成,上面的代码中有一个 executeAndDecode() 方法,该方法通过RequestTemplate生成Request请求对象,然后利用Http Client获取response,来获取响应信息。

final class SynchronousMethodHandler implements MethodHandler {

    Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
        //转化为Http请求报文
        Request request = targetRequest(template);

        if (logLevel != Logger.Level.NONE) {
            logger.logRequest(metadata.configKey(), logLevel, request);
        }

        Response response;
        long start = System.nanoTime();
        try {
            //发起远程通信
            response = client.execute(request, options);
            
            //获取返回结果
            response = response.toBuilder()
                    .request(request)
                    .requestTemplate(template)
                    .build();
        } catch (IOException e) {
            if (logLevel != Logger.Level.NONE) {
                logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
            }
            throw errorExecuting(request, e);
        }
        long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);


        if (decoder != null)
            return decoder.decode(response, metadata.returnType());

        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
        asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
                    metadata.returnType(),elapsedTime);

        try {
            if (!resultFuture.isDone())
                throw new IllegalStateException("Response handling not done");

            return resultFuture.join();
        } catch (CompletionException e) {
            Throwable cause = e.getCause();
            if (cause != null)
                throw cause;
            throw e;
        }
    }
}

经过上面的分析,这里的 client.execute 的 client 的类型是LoadBalancerFeignClient


LoadBalancerFeignClient#execute

这里就很自然的进入 LoadBalancerFeignClient#execute

public class LoadBalancerFeignClient implements Client {

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            URI asUri = URI.create(request.url());
            String clientName = asUri.getHost();
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                    this.delegate, request, uriWithoutHost);

            IClientConfig requestConfig = getClientConfig(options, clientName);
            return lbClient(clientName)
                    .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }
}

其实这个execute里面得流程就是 Ribbon 的那一套。我们可以简单的看一下。首先是构造URI,构造RibbonRequest,选择 LoadBalance,发起调用。

来看一下lbClient 选择负载均衡器的时候做了什么

public class LoadBalancerFeignClient implements Client {

    private FeignLoadBalancer lbClient(String clientName) {
        return this.lbClientFactory.create(clientName);
    }
    
    public FeignLoadBalancer create(String clientName) {
        FeignLoadBalancer client = this.cache.get(clientName);
        if (client != null) {
            return client;
        }
        IClientConfig config = this.factory.getClientConfig(clientName);
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
                ServerIntrospector.class);
        client = this.loadBalancedRetryFactory != null
                ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
                        this.loadBalancedRetryFactory)
                : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }   
}

可以得出的结论就是 this.factory.getLoadBalancer(clientName) 跟Ribbon 源码里的获取方式一样,无疑这里获取的就是默认的 ZoneAwareLoadBalancer。然后包装成一个 FeignLoadBalancer 进行返回。

既然负载均衡器选择完了,那么一定还有个地方通过该负载去选择一个服务,接着往下看:

public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
    }
}

上面这段代码就是通过获取到的负载进行执行请求,但是这个时候 服务还没有选择,我们跟进去 submit 请求看一看究竟:

public class LoadBalancerCommand<T> {

    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    //省略......
                });
            
        //省略......
    }
}

可以看到这里有个 selectServer的方法 ,跟进去:

public class LoadBalancerCommand<T> {

    private final LoadBalancerContext loadBalancerContext;

    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
}


public class LoadBalancerContext implements IClientConfigAware {

    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }
        if (original != null) {
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // Various Supported Cases
        // The loadbalancer to use and the instances it has is based on how it was registered
        // In each of these cases, the client might come in using Full Url or Partial URL
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                host = svc.getHost();
                if (host == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                return svc;
            } else {
                // No Full URL - and we dont have a LoadBalancer registered to
                // obtain a server
                // if we have a vipAddress that came with the registration, we
                // can use that else we
                // bail out
                if (vipAddresses != null && vipAddresses.contains(",")) {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            "Method is invoked for client " + clientName + " with partial URI of ("
                            + original
                            + ") with no load balancer configured."
                            + " Also, there are multiple vipAddresses and hence no vip address can be chosen"
                            + " to complete this partial uri");
                } else if (vipAddresses != null) {
                    try {
                        Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
                        host = hostAndPort.first();
                        port = hostAndPort.second();
                    } catch (URISyntaxException e) {
                        throw new ClientException(
                                ClientException.ErrorType.GENERAL,
                                "Method is invoked for client " + clientName + " with partial URI of ("
                                + original
                                + ") with no load balancer configured. "
                                + " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
                    }
                } else {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            this.clientName
                            + " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
                            + " Also has no vipAddress registered");
                }
            }
        } else {
            // Full URL Case
            // This could either be a vipAddress or a hostAndPort or a real DNS
            // if vipAddress or hostAndPort, we just have to consult the loadbalancer
            // but if it does not return a server, we should just proceed anyways
            // and assume its a DNS
            // For restClients registered using a vipAddress AND executing a request
            // by passing in the full URL (including host and port), we should only
            // consult lb IFF the URL passed is registered as vipAddress in Discovery
            boolean shouldInterpretAsVip = false;

            if (lb != null) {
                shouldInterpretAsVip = isVipRecognized(original.getAuthority());
            }
            if (shouldInterpretAsVip) {
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc != null){
                    host = svc.getHost();
                    if (host == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Invalid Server for :" + svc);
                    }
                    logger.debug("using LB returned Server: {} for request: {}", svc, original);
                    return svc;
                } else {
                    // just fall back as real DNS
                    logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
                }
            } else {
                // consult LB to obtain vipAddress backed instance given full URL
                //Full URL execute request - where url!=vipAddress
                logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
            }
        }
        // end of creating final URL
        if (host == null){
            throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
        }
        // just verify that at this point we have a full URL

        return new Server(host, port);
    }
}

可以看到的是这里获取到了之前构造好的 ZoneAwareLoadBalancer 然后调用 chooseServer 方法获取server ,这个是跟Ribbon 中是一样的流程。

获取到了server 后,会回调先前 executeWithLoadBalancer 方法里构造的 ServerOperation 的 call 方法:

return command.submit(
    new ServerOperation<T>() {
        @Override
        public Observable<T> call(Server server) {
            URI finalUri = reconstructURIWithServer(server, request.getUri());
            S requestForServer = (S) request.replaceUri(finalUri);
            try {
                return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
            } 
            catch (Exception e) {
                return Observable.error(e);
            }
        }
    })
    .toBlocking()
    .single();

然后会执行 AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig) 进行最后的调用,实际上这里走的是 FeignLoadBalancer#execute

public class FeignLoadBalancer extends
        AbstractLoadBalancerAwareClient<FeignLoadBalancer.RibbonRequest, FeignLoadBalancer.RibbonResponse> {

    @Override
    public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
            throws IOException {
        Request.Options options;
        if (configOverride != null) {
            RibbonProperties override = RibbonProperties.from(configOverride);
            options = new Request.Options(override.connectTimeout(this.connectTimeout),
                    override.readTimeout(this.readTimeout));
        }
        else {
            options = new Request.Options(this.connectTimeout, this.readTimeout);
        }
        Response response = request.client().execute(request.toRequest(), options);
        return new RibbonResponse(request.getUri(), response);
    }
}

而这里调用的request.client().execute(request.toRequest(), options) 则是 DefaultFeignLoadBalancedConfiguration 注入的 LoadBalancerFeignClient ,在构造 LoadBalancerFeignClient 的时候 ,传递了个 feign.Client.Default ,然后利用 feign.Client.Default 构造了一个 RibbonRequest。

所以这里走 feign.Client.Default#execute :

public interface Client {

    class Default implements Client {
        @Override
        public Response execute(Request request, Options options) throws IOException {
          HttpURLConnection connection = convertAndSend(request, options);
          return convertResponse(connection, request);
        }
    }
}

利用 JDK 提供的 HttpURLConnection 发起远程的 HTTP通讯。至此发起请求的流程就完成了。下面附上一张这个过程的流程图:

对于Ribbon的调用过程请参考:https://www.jianshu.com/p/f3db11f045cc

OpenFeign Configuration :

针对 feign 的 Configuration,官方给我们提供了很多的个性化配置,具体可以参考 org.springframework.cloud.openfeign.FeignClientProperties.FeignClientConfiguration

public static class FeignClientConfiguration {

    // 日志
    private Logger.Level loggerLevel;
    // 连接超时
    private Integer connectTimeout;

    private Integer readTimeout;
    //重试
    private Class<Retryer> retryer;
    //解码
    private Class<ErrorDecoder> errorDecoder;

    private List<Class<RequestInterceptor>> requestInterceptors;
    // 编码
    private Boolean decode404;

    private Class<Decoder> decoder;

    private Class<Encoder> encoder;
    // 解析
    private Class<Contract> contract;

    private ExceptionPropagationPolicy exceptionPropagationPolicy;
}

这里举个简单的例子,以Logger 为例。我们想为每个不同的 FeignClient 设置日志级别。

1、添加配置类:

@Configuration
public class FooConfiguration {
    @Bean
    Logger.Level feignLoggerLevel() {
        return Logger.Level.FULL;
    }
}

2、配置日志级别 ,logging.level + FeignClient 包的全路径。

logging.level.com.wuzz.FeignClientService: DEBUG

就这样就配置完成了。重启服务就可以看到效果。

参考:
https://www.cnblogs.com/lucky-yqy/p/14589434.html

https://www.cnblogs.com/wuzhenzhao/p/13680807.html

上一篇下一篇

猜你喜欢

热点阅读