SOFA

SOFARPC源码解析-服务引用

2018-05-18  本文已影响156人  鋒Nic

简介摘要
SOFARPC服务发布创建服务运行容器配置ServerConfig,设置基础配置并且通过配置文件加载服务端默认配置;创建服务发布配置ProviderConfig,设置接口名称、接口实现类引用以及指定服务端配置;通过服务发布启动类ProviderBootstrap发布服务。SOFARPC服务引用按照编程界面分为两种使用SOFARPC的方式:
1.通过SOFARPC使用:
服务引用过程涉及到RegistryConfig注册中心配置类以及ConsumerConfig服务引用配置类。
(1)RegistryConfig注册中心配置类

RegistryConfig registryConfig = new RegistryConfig()
            .setProtocol("zookeeper")
            .setAddress("127.0.0.1:2181")

RegistryConfig表示注册中心,如上声明服务注册中心的地址和端口是127.0.0.1:2181,协议是Zookeeper。
(2) ConsumerConfig服务引用配置类

ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName())       
            .setRegistry(registryConfig);
HelloService helloService = consumerConfig.refer();

ConsumerConfig表示服务引用,如上声明所引用服务的接口和服务注册中心。最终通过refer方法将此服务引用,获取到该服务的远程调用的代理。
SOFARPC服务引用支持如下特性:
(1)同一服务注册多个注册中心,构建多个RegistryConfig设置给 ConsumerConfig:

List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
registryConfigs.add(registryA);
registryConfigs.add(registryB);
consumerConfig.setRegistry(registryConfigs);

(2)提供MethodConfig进行方法级别参数设置,API方式使用相应的对象set法即可为其设置参数:

MethodConfig methodConfigA = new MethodConfig();
MethodConfig methodConfigB = new MethodConfig();
List<MethodConfig> methodConfigs = new ArrayList<MethodConfig>();
methodConfigs.add(methodConfigA);
methodConfigs.add(methodConfigB);   
consumerConfig.setMethods(methodConfigs);  //客户端设置

2.通过SOFABoot使用:
服务引用使用XML配置通过sofa:reference元素表示引用服务,XML配置如下所示就能够引用SOFARPC服务:

<sofa:reference id="helloSyncServiceReference" interface="com.alipay.sofa.rpc.samples.invoke.HelloSyncService">
    <sofa:binding.bolt/>
</sofa:reference>

如上通过sofa:reference元素引用了一个服务,其中id属性表示该服务引用在Spring上下文中的唯一标识,interface表示该服务的接口。sofa:binding.bolt表示该服务引用调用时使用的协议为bolt。
当引用服务的SOFARPC应用启动的时候从服务注册中心订阅到相应服务的元数据信息。服务注册中心收到订阅请求将发布方的元数据列表实时推送给服务引用方。当服务引用方拿到发布方的地址从中选取地址发起服务调用。SOFARPC应用启动为每一个服务引用生成一个远程调用的代理,Spring获取Bean通过id获取到服务引用进行使用:

HelloSyncService helloSyncServiceReference = (HelloSyncService) applicationContext
            .getBean("helloSyncServiceReference");
String result = helloSyncServiceReference.saySync("sync");

每一个服务引用对应sofa:binding元素,也就是说如果想对同一个服务发起不同协议的调用,需要如下配置:

<sofa:reference id="boltHelloSyncServiceReference" interface="com.alipay.sofa.rpc.samples.invoke.HelloSyncService">
    <sofa:binding.bolt/>
</sofa:reference>
<sofa:reference id="restHelloSyncServiceReference" interface="com.alipay.sofa.rpc.samples.invoke.HelloSyncService">
    <sofa:binding.rest/>
</sofa:reference>
<sofa:reference id="dubboHelloSyncServiceReference" interface="com.alipay.sofa.rpc.samples.invoke.HelloSyncService">
    <sofa:binding.dubbo/>
</sofa:reference>

声明服务引用的同时设置需要的参数,global-attrs元素设置调用超时,地址等待时间等参数; target-url元素能够设置直连调用的地址;method标签能够设置方法级别参数:

<sofa:reference id="personReferenceBolt" interface="com.alipay.sofa.boot.examples.demo.rpc.bean.PersonService">
    <sofa:binding.bolt>
        <sofa:global-attrs timeout="3000" address-wait-time="2000"/>  <!-- 调用超时;地址等待时间。 -->
        <sofa:route target-url="127.0.0.1:22000"/>  <!-- 直连地址 -->
        <sofa:method name="sayName" timeout="3000"/> <!-- 方法级别配置 -->
    </sofa:binding.bolt>
</sofa:reference>

<sofa:reference id="personReferenceBolt" interface="com.alipay.sofa.boot.examples.demo.rpc.bean.PersonService">
    <sofa:binding.bolt/>
</sofa:reference>
<sofa:reference id="personReferenceRest" interface="com.alipay.sofa.boot.examples.demo.rpc.bean.PersonService">
    <sofa:binding.rest/>
</sofa:reference>
<sofa:reference id="personReferenceDubbo" interface="com.alipay.sofa.boot.examples.demo.rpc.bean.PersonService">
    <sofa:binding.dubbo/>
</sofa:reference>

其中sofa:reference元素表示引用该服务,sofa:binding元素声明该服务引用的调用的协议。如上Spring上下文构建三个服务的远程代理类,名字分别为personReferenceBolt、personReferenceRest以及personReferenceDubbo。
客户端模块包含集群管理、路由、地址管理器、连接管理器、负载均衡器,以及与代理、注册中心等模块交互:

客户端调用流程
源码解析
搭建环境服务发布示例:
package org.alipay.sofa.rpc;

import com.alipay.sofa.rpc.config.ConsumerConfig;

public class RpcClient {

    public static void main(String[] args) {

        ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
                .setInterfaceId(HelloService.class.getName()) // 指定接口
                .setProtocol("bolt") // 指定协议
                .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址
                .setConnectTimeout(10 * 1000);

        HelloService helloService = consumerConfig.refer();

        while (true) {
            try {
                System.out.println(helloService.sayHello("world"));
            } catch (Exception e) {
                e.printStackTrace();
            }

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

参考SOFARPC Example示例模块(com.alipay.sofa.rpc.quickstart.QuickStartClient):

package com.alipay.sofa.rpc.quickstart;

import com.alipay.sofa.rpc.config.ConsumerConfig;

/**
 * Quick Start client
 */
public class QuickStartClient {

    public static void main(String[] args) {

        ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setProtocol("bolt") // 指定协议
            .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址
            .setConnectTimeout(10 * 1000);

        HelloService helloService = consumerConfig.refer();

        while (true) {
            try {
                System.out.println(helloService.sayHello("world"));
            } catch (Exception e) {
                e.printStackTrace();
            }

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

运行服务引用端示例类QuickStartClient查看消费端运行效果,服务消费者输出日志如下:

2018-05-18 01:51:20,472 main  INFO [com.alipay.sofa.rpc.context.RpcRuntimeContext:info:102] - Welcome! Loading SOFA RPC Framework : 5.4.0_20180427231325, PID is:1424
2018-05-18 01:51:20,555 main  INFO [com.alipay.sofa.rpc.module.ModuleFactory:info:102] - Install Module: fault-tolerance
2018-05-18 01:51:20,658 main  INFO [com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap:infoWithApp:122] - Refer consumer config : bolt://com.alipay.sofa.rpc.quickstart.HelloService: with bean id rpc-cfg-0
2018-05-18 01:51:21,119 main  INFO [com.alipay.sofa.rpc.client.AllConnectConnectionHolder:infoWithApp:122] - Add provider of com.alipay.sofa.rpc.quickstart.HelloService, size is : 1
Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j ]
2018-05-18 01:51:21,322 SOFA-CLI-CONN-com.alipay.sofa.rpc.quickstart.HelloService-3-T1  INFO [com.alipay.sofa.common.log:report:30] - Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j ]
log4j:ERROR Failed to rename [C:\Users\Administrator\logs/bolt/connection-event.log] to [C:\Users\Administrator\logs/bolt/connection-event.log.2018-05-12].
2018-05-18 01:51:25,236 SOFA-CLI-CONN-com.alipay.sofa.rpc.quickstart.HelloService-3-T1  INFO [com.alipay.sofa.rpc.client.AllConnectConnectionHolder:infoWithApp:122] - Connect to com.alipay.sofa.rpc.quickstart.HelloService provider:bolt://127.0.0.1:12200 success ! The connection is 127.0.0.1:12200 <-> 127.0.0.1:51301
hello world !
hello world !

SOFARPC服务引用流程:
(1)创建服务引用配置类ConsumerConfig,设置ConsumerConfig实例接口名称(服务接口:做为服务唯一标识的组成部分)、调用协议、直连调用地址以及连接超时时间:

/**
 * 服务消费者配置
 * 
 * @param <T> the type parameter
 */
public class ConsumerConfig<T> extends AbstractInterfaceConfig<T, ConsumerConfig<T>> implements Serializable

服务引用配置类ConsumerConfig继承接口级公共配置类AbstractInterfaceConfig,能够通过集成的注册中心动态调整服务引用接口&IP级别配置例如超时时间、权重等。
(2)服务引用配置类ConsumerConfig负责加载调整服务引用接口&IP级配置,根据服务消费配置获取代理对象引用,单一职责原则绑定服务消费者启动类ConsumerBootstrap实施引用服务:

/**
 * 引用服务
 *
 * @return 服务代理类 t
 */
public T refer() {
    if (consumerBootstrap == null) {
        consumerBootstrap = Bootstraps.from(this);
    }
    return consumerBootstrap.refer();
}

首先判断服务消费者启动类ConsumerBootstrap是否为空,通过引用服务辅助工具类Bootstraps按照绑定服务引用配置扩展加载工厂ExtensionLoaderFactory加载初始化ConsumerBootstrap实例,默认服务消费者启动类ConsumerBootstrap与调用协议启动类实例相同:

/**
 * 引用一个服务
 *
 * @param consumerConfig 服务消费者配置
 * @param <T>            接口类型
 * @return 引用启动类
 */
public static <T> ConsumerBootstrap<T> from(ConsumerConfig<T> consumerConfig) {
    String bootstrap = consumerConfig.getBootstrap();
    ConsumerBootstrap consumerBootstrap;
    if (StringUtils.isNotEmpty(bootstrap)) {
        consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
            .getExtension(bootstrap,
                new Class[] { ConsumerConfig.class },
                new Object[] { consumerConfig });
    } else {
        // default is same with protocol
        bootstrap = consumerConfig.getProtocol();
        ExtensionLoader extensionLoader = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class);
        ExtensionClass<ConsumerBootstrap> extensionClass = extensionLoader.getExtensionClass(bootstrap);
        if (extensionClass == null) {
            // if not exist, use default consumer bootstrap
            bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_CONSUMER_BOOTSTRAP);
            consumerConfig.setBootstrap(bootstrap);
            consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
                .getExtension(bootstrap, new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
        } else {
            consumerConfig.setBootstrap(bootstrap);
            consumerBootstrap = extensionClass.getExtInstance(
                new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
        }
    }
    return (ConsumerBootstrap<T>) consumerBootstrap;
}

接着引用服务包装类ConsumerBootstrap通过refer()方法调用服务,ConsumerBootstrap基于Bolt、Rest、Dubbo网络通信协议提供三种协议服务引用实现类:BoltConsumerBootstrap、RestConsumerBootstrap以及DubboConsumerBootstrap。默认服务消费者启动器DefaultConsumerBootstrap调用服务refer()方法首先同步双重检查服务引用代理实现类实例是否不为空,不为空则返回代理实现引用,接着执行SOFARPC引用服务逻辑:

public T refer() {
    if (proxyIns != null) {
        return proxyIns;
    }
    synchronized (this) {
        if (proxyIns != null) {
            return proxyIns;
        }
        String key = consumerConfig.buildKey();
        String appName = consumerConfig.getAppName();
        // 检查参数
        checkParameters();
        // 提前检查接口类
        if (LOGGER.isInfoEnabled(appName)) {
            LOGGER.infoWithApp(appName, "Refer consumer config : {} with bean id {}", key, consumerConfig.getId());
        }

        // 注意同一interface,同一tags,同一protocol情况
        AtomicInteger cnt = REFERRED_KEYS.get(key); // 计数器
        if (cnt == null) { // 没有发布过
            cnt = CommonUtils.putToConcurrentMap(REFERRED_KEYS, key, new AtomicInteger(0));
        }
        int c = cnt.incrementAndGet();
        int maxProxyCount = consumerConfig.getRepeatedReferLimit();
        if (maxProxyCount > 0) {
            if (c > maxProxyCount) {
                cnt.decrementAndGet();
                // 超过最大数量,直接抛出异常
                throw new SofaRpcRuntimeException("Duplicate consumer config with key " + key
                    + " has been referred more than " + maxProxyCount + " times!"
                    + " Maybe it's wrong config, please check it."
                    + " Ignore this if you did that on purpose!");
            } else if (c > 1) {
                if (LOGGER.isInfoEnabled(appName)) {
                    LOGGER.infoWithApp(appName, "Duplicate consumer config with key {} has been referred!"
                        + " Maybe it's wrong config, please check it."
                        + " Ignore this if you did that on purpose!", key);
                }
            }
        }

        try {
            // build cluster
            cluster = ClusterFactory.getCluster(this);
            // build listeners
            consumerConfig.setConfigListener(buildConfigListener(this));
            consumerConfig.setProviderInfoListener(buildProviderInfoListener(this));
            // init cluster
            cluster.init();
            // 构造Invoker对象(执行链)
            proxyInvoker = buildClientProxyInvoker(this);
            // 创建代理类
            proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(),
                proxyInvoker);
        } catch (Exception e) {
            if (cluster != null) {
                cluster.destroy();
                cluster = null;
            }
            consumerConfig.setConfigListener(null);
            consumerConfig.setProviderInfoListener(null);
            cnt.decrementAndGet(); // 发布失败不计数
            if (e instanceof SofaRpcRuntimeException) {
                throw (SofaRpcRuntimeException) e;
            } else {
                throw new SofaRpcRuntimeException("Build consumer proxy error!", e);
            }
        }
        if (consumerConfig.getOnAvailable() != null && cluster != null) {
            cluster.checkStateChange(false); // 状态变化通知监听器
        }
        RpcRuntimeContext.cacheConsumerConfig(this);
        return proxyIns;
    }
}

默认服务消费者启动器DefaultConsumerBootstrap引用服务方法执行逻辑:
(1)根据服务引用配置ConsumerConfig 获取调用协议、服务接口以及服务标签构建key以及获取应用名称appName;
(2)调用checkParameters()方法检查服务消费者配置字段和参数:

/**
 * for check fields and parameters of consumer config 
 */
protected void checkParameters() {

}

(3)检验同一个服务(调用协议&服务接口&服务标签相同)的发布次数是否超过服务引用配置的最大次数,超过最大数量直接抛出异常;
(4)客户端集群工厂ClusterFactory通过服务消费者启动器构造客户端集群Cluster,根据服务消费者启动器服务消费者配置扩展加载工厂ExtensionLoaderFactory加载初始化Cluster实例:

/**
 * 构造Client对象
 *
 * @param consumerBootstrap 客户端配置
 * @return Client对象
 */
public static Cluster getCluster(ConsumerBootstrap consumerBootstrap) {
    try {
        ConsumerConfig consumerConfig = consumerBootstrap.getConsumerConfig();
        ExtensionClass<Cluster> ext = ExtensionLoaderFactory.getExtensionLoader(Cluster.class)
            .getExtensionClass(consumerConfig.getCluster());
        if (ext == null) {
            throw ExceptionUtils.buildRuntime("consumer.cluster",
                consumerConfig.getCluster(), "Unsupported cluster of client!");
        }
        return ext.getExtInstance(new Class[] { ConsumerBootstrap.class },
            new Object[] { consumerBootstrap });
    } catch (SofaRpcRuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new SofaRpcRuntimeException(e.getMessage(), e);
    }
}

客户端集群Cluster封装集群模式、长连接管理、服务路由、负载均衡等抽象类,实现调用器Invoker、服务发布信息监听器ProviderInfoListener、可初始化接口Initializable, 可销毁接口Destroyable,涵盖两种客户端集群实现类:快速失败客户端集群FailFastCluster和故障转移(支持重试和指定地址调用)客户端集群FailoverCluster:

/**
 * 客户端,封装了集群模式、长连接管理、服务路由、负载均衡等抽象类
 *
 */
@Extensible(singleton = false)
@ThreadSafe
public abstract class Cluster implements Invoker, ProviderInfoListener, Initializable, Destroyable {...}
/**
 * 快速失败
 *
 */
@Extension("failfast")
public class FailFastCluster extends AbstractCluster {...}
/**
 * 故障转移,支持重试和指定地址调用
 *
 */
@Extension("failover")
public class FailoverCluster extends AbstractCluster {...}

(5)服务端订阅者配置实例设置根据服务消费者启动器构建的服务引用配置发生变化监听器、集群服务端地址监听器;
(6)初始化客户端集群Cluster实例包括构建路由链routerChain,加载负载均衡策略loadBalancer,获取地址保持器addressHolder,构造调用端过滤链filterChain,连接管理器启动重连线程,服务消费启动器订阅服务发布者列表,初始化服务端连接建立长连接:

public synchronized void init() {
    if (initialized) { // 已初始化
        return;
    }
    // 构造Router链
    routerChain = RouterChain.buildConsumerChain(consumerBootstrap);
    // 负载均衡策略 考虑是否可动态替换?
    loadBalancer = LoadBalancerFactory.getLoadBalancer(consumerBootstrap);
    // 地址管理器
    addressHolder = AddressHolderFactory.getAddressHolder(consumerBootstrap);
    // 连接管理器
    connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);
    // 构造Filter链,最底层是调用过滤器
    this.filterChain = FilterChain.buildConsumerChain(this.consumerConfig,
        new ConsumerInvoker(consumerBootstrap));

    if (consumerConfig.isLazy()) { // 延迟连接
        if (LOGGER.isInfoEnabled(consumerConfig.getAppName())) {
            LOGGER.infoWithApp(consumerConfig.getAppName(), "Connection will be initialized when first invoke.");
        }
    }

    // 启动重连线程
    connectionHolder.init();
    try {
        // 得到服务端列表
        List<ProviderGroup> all = consumerBootstrap.subscribe();
        if (CommonUtils.isNotEmpty(all)) {
            // 初始化服务端连接(建立长连接)
            updateAllProviders(all);
        }
    } catch (SofaRpcRuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new SofaRpcRuntimeException("Init provider's transport error!", e);
    }

    // 启动成功
    initialized = true;

    // 如果check=true表示强依赖
    if (consumerConfig.isCheck() && !isAvailable()) {
        throw new SofaRpcRuntimeException("The consumer is depend on alive provider " +
            "and there is no alive provider, you can ignore it " +
            "by ConsumerConfig.setCheck(boolean) (default is false)");
    }
}

路由链RouterChain根据服务端订阅者配置构建服务消费路由链:1.获取服务引用配置路由规则引用;2.解析路由规则判断是否需要排除系统过滤器;3.解析通过别名方式注入的路由准备数据;4.解析自动加载的路由添加到扩展路由集合;5.根据扩展路由和服务端订阅者启动器创建路由链RouteChain:

/**
 * 构建Router链
 *
 * @param consumerBootstrap 服务端订阅者配置
 * @return 路由链
 */
public static RouterChain buildConsumerChain(ConsumerBootstrap consumerBootstrap) {
    ConsumerConfig<?> consumerConfig = consumerBootstrap.getConsumerConfig();
    List<Router> customRouters = consumerConfig.getRouterRef() == null ? new ArrayList<Router>()
        : new CopyOnWriteArrayList<Router>(consumerConfig.getRouterRef());
    // 先解析是否有特殊处理
    HashSet<String> excludes = parseExcludeRouter(customRouters);

    // 准备数据:用户通过别名的方式注入的router,需要解析
    List<ExtensionClass<Router>> extensionRouters = new ArrayList<ExtensionClass<Router>>();
    List<String> routerAliases = consumerConfig.getRouter();
    if (CommonUtils.isNotEmpty(routerAliases)) {
        for (String routerAlias : routerAliases) {
            if (startsWithExcludePrefix(routerAlias)) { // 排除用的特殊字符
                excludes.add(routerAlias.substring(1));
            } else {
                extensionRouters.add(EXTENSION_LOADER.getExtensionClass(routerAlias));
            }
        }
    }
    // 解析自动加载的router
    if (!excludes.contains(StringUtils.ALL) && !excludes.contains(StringUtils.DEFAULT)) { // 配了-*和-default表示不加载内置
        for (Map.Entry<String, ExtensionClass<Router>> entry : CONSUMER_AUTO_ACTIVES.entrySet()) {
            if (!excludes.contains(entry.getKey())) {
                extensionRouters.add(entry.getValue());
            }
        }
    }
    excludes = null; // 不需要了
    // 按order从小到大排序
    if (extensionRouters.size() > 1) {
        Collections.sort(extensionRouters, new OrderedComparator<ExtensionClass>());
    }
    List<Router> actualRouters = new ArrayList<Router>();
    for (ExtensionClass<Router> extensionRouter : extensionRouters) {
        Router actualRoute = extensionRouter.getExtInstance();
        actualRouters.add(actualRoute);
    }
    // 加入自定义的过滤器
    actualRouters.addAll(customRouters);
    return new RouterChain(actualRouters, consumerBootstrap);
}
/**
 * 判断是否需要排除系统过滤器
 *
 * @param customRouters 自定义Router
 * @return 是否排除
 */
private static HashSet<String> parseExcludeRouter(List<Router> customRouters) {
    HashSet<String> excludeKeys = new HashSet<String>();
    if (CommonUtils.isNotEmpty(customRouters)) {
        for (Router router : customRouters) {
            if (router instanceof ExcludeRouter) {
                // 存在需要排除的过滤器
                ExcludeRouter excludeRouter = (ExcludeRouter) router;
                String excludeName = excludeRouter.getExcludeName();
                if (StringUtils.isNotEmpty(excludeName)) {
                    String excludeRouterName = startsWithExcludePrefix(excludeName) ? excludeName.substring(1)
                        : excludeName;
                    if (StringUtils.isNotEmpty(excludeRouterName)) {
                        excludeKeys.add(excludeRouterName);
                    }
                }
                customRouters.remove(router);
            }
        }
    }
    if (!excludeKeys.isEmpty()) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Find exclude routers: {}", excludeKeys);
        }
    }
    return excludeKeys;
}
public RouterChain(List<Router> actualRouters, ConsumerBootstrap consumerBootstrap) {
    this.routers = new ArrayList<Router>();
    if (CommonUtils.isNotEmpty(actualRouters)) {
        for (Router router : actualRouters) {
            if (router.needToLoad(consumerBootstrap)) {
                router.init(consumerBootstrap);
                routers.add(router);
            }
        }
    }
}

负载均衡工厂LoadBalancerFactory、地址管理工厂AddressHolderFactory、连接管理工厂ConnectionHolder根据服务消费者配置获取负载均衡器、地址保持器、连接管理器:读取服务消费配置负载均衡策略、地址保持配置、连接管理配置,扩展加载工厂ExtensionLoaderFactory根据负载均衡、地址保持、连接管理相应配置生成负载均衡器、地址保持器、连接管理器实例:

/**
 * 根据名字得到负载均衡器
 *
 * @param consumerBootstrap 服务订阅者配置
 * @return LoadBalancer
 */
public static LoadBalancer getLoadBalancer(ConsumerBootstrap consumerBootstrap) {
    try {
        String loadBalancer = consumerBootstrap.getConsumerConfig().getLoadBalancer();
        ExtensionClass<LoadBalancer> ext = ExtensionLoaderFactory
            .getExtensionLoader(LoadBalancer.class).getExtensionClass(loadBalancer);
        if (ext == null) {
            throw ExceptionUtils.buildRuntime("consumer.loadBalancer",
                loadBalancer, "Unsupported loadBalancer of client!");
        }
        return ext.getExtInstance(new Class[] { ConsumerBootstrap.class }, new Object[] { consumerBootstrap });
    } catch (SofaRpcRuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new SofaRpcRuntimeException(e.getMessage(), e);
    }
}
/**
 * 根据配置得到连接管理器
 *
 * @param consumerBootstrap 服务消费者配置
 * @return AddressHolder
 */
public static AddressHolder getAddressHolder(ConsumerBootstrap consumerBootstrap) {
    try {
        String connectionHolder = consumerBootstrap.getConsumerConfig().getAddressHolder();
        ExtensionClass<AddressHolder> ext = ExtensionLoaderFactory.getExtensionLoader(AddressHolder.class)
            .getExtensionClass(connectionHolder);
        if (ext == null) {
            throw ExceptionUtils.buildRuntime("consumer.addressHolder", connectionHolder,
                "Unsupported addressHolder of client!");
        }
        return ext.getExtInstance(new Class[] { ConsumerBootstrap.class }, new Object[] { consumerBootstrap });
    } catch (SofaRpcRuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new SofaRpcRuntimeException(e.getMessage(), e);
    }
}
/**
 * 根据配置得到连接管理器
 *
 * @param consumerBootstrap 服务消费者配置
 * @return ConnectionHolder
 */
public static ConnectionHolder getConnectionHolder(ConsumerBootstrap consumerBootstrap) {
    try {
        String connectionHolder = consumerBootstrap.getConsumerConfig().getConnectionHolder();
        ExtensionClass<ConnectionHolder> ext = ExtensionLoaderFactory
            .getExtensionLoader(ConnectionHolder.class).getExtensionClass(connectionHolder);
        if (ext == null) {
            throw ExceptionUtils.buildRuntime("consumer.connectionHolder", connectionHolder,
                "Unsupported connectionHolder of client!");
        }
        return ext.getExtInstance(new Class[] { ConsumerBootstrap.class }, new Object[] { consumerBootstrap });
    } catch (SofaRpcRuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new SofaRpcRuntimeException(e.getMessage(), e);
    }
}

客户端集群按照服务器启动器配置构造消费调用器ConsumerInvoker,消费调用器invoke()方法使用客户端发送数据给服务器执行服务调用过程,设置调用服务端远程地址同步/单向/Callback/ Future调用远程服务,譬如Bolt客户端传输BoltClientTransport同步调用通过RpcClient获取远程连接使用管道Channel的writeAndFlush(Object msg)方法发送客户端数据,并且添加监听器提供发送失败Future添加失败响应场景进行远程访问调用。过滤链FilterChain根据服务端订阅者配置构造调用端执行链,调用端执行链最底层是调用过滤器:按照自动装载扩展实现逻辑获取用户new实例方式注入的过滤器(优先级高),判断是否需要排除系统过滤器,解析用户通过别名方式注入的过滤器准备数据,解析自动加载的过滤器添加到自定义的过滤器构造执行链:

/**
* Instantiates a new Consumer invoke filter.
*
* @param consumerBootstrap 服务器启动着配置
*/
public ConsumerInvoker(ConsumerBootstrap consumerBootstrap) {
   super(consumerBootstrap.getConsumerConfig());
   this.consumerBootstrap = consumerBootstrap;
}

public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
   // 设置下服务器应用
   ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo();
   String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME);
   if (StringUtils.isNotEmpty(appName)) {
       sofaRequest.setTargetAppName(appName);
   }

   // 目前只是通过client发送给服务端
   return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);
}
/**
* 调用客户端
*
* @param transport 客户端连接
* @param request   Request对象
* @return 调用结果
* @throws SofaRpcException rpc异常
*/
protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport,
                                SofaRequest request) throws SofaRpcException {
   RpcInternalContext context = RpcInternalContext.getContext();
   // 添加调用的服务端远程地址
   RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(), providerInfo.getPort());
   try {
       checkProviderVersion(providerInfo, request); // 根据服务端版本特殊处理
       String invokeType = request.getInvokeType();
       int timeout = resolveTimeout(request, consumerConfig, providerInfo);

       SofaResponse response = null;
       // 同步调用
       if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
           long start = RpcRuntimeContext.now();
           try {
               response = transport.syncSend(request, timeout);
           } finally {
               if (RpcInternalContext.isAttachmentEnable()) {
                   long elapsed = RpcRuntimeContext.now() - start;
                   context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
               }
           }
       }
       // 单向调用
       else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
           long start = RpcRuntimeContext.now();
           try {
               transport.oneWaySend(request, timeout);
               response = buildEmptyResponse(request);
           } finally {
               if (RpcInternalContext.isAttachmentEnable()) {
                   long elapsed = RpcRuntimeContext.now() - start;
                   context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
               }
           }
       }
       // Callback调用
       else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
           // 调用级别回调监听器
           SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
           if (sofaResponseCallback == null) {
               SofaResponseCallback methodResponseCallback = consumerConfig
                   .getMethodOnreturn(request.getMethodName());
               if (methodResponseCallback != null) { // 方法的Callback
                   request.setSofaResponseCallback(methodResponseCallback);
               }
           }
           // 记录发送开始时间
           context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
           // 开始调用
           transport.asyncSend(request, timeout);
           response = buildEmptyResponse(request);
       }
       // Future调用
       else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
           // 记录发送开始时间
           context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
           // 开始调用
           ResponseFuture future = transport.asyncSend(request, timeout);
           // 放入线程上下文
           RpcInternalContext.getContext().setFuture(future);
           response = buildEmptyResponse(request);
       } else {
           throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
       }
       return response;
   } catch (SofaRpcException e) {
       throw e;
   } catch (Throwable e) { // 客户端其它异常
       throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e);
   }
}
/**
 * 构造调用端的执行链
 *
 * @param consumerConfig consumer配置
 * @param lastFilter     最后一个filter
 * @return filter执行链
 */
public static FilterChain buildConsumerChain(ConsumerConfig<?> consumerConfig, FilterInvoker lastFilter) {

    /*
    * 例如自动装载扩展 A(a),B(b),C(c)  filter=[-a,d]  filterRef=[new E, new Exclude(b)]
    * 逻辑如下:
    * 1.解析config.getFilterRef(),记录E和-b
    * 2.解析config.getFilter()字符串,记录 d 和 -a,-b
    * 3.再解析自动装载扩展,a,b被排除了,所以拿到c,d
    * 4.对c d进行排序
    * 5.拿到C、D实现类
    * 6.加上自定义,返回C、D、E
    */
    // 用户通过自己new实例的方式注入的filter,优先级高
    List<Filter> customFilters = consumerConfig.getFilterRef() == null ?
        new ArrayList<Filter>() : new CopyOnWriteArrayList<Filter>(consumerConfig.getFilterRef());
    // 先解析是否有特殊处理
    HashSet<String> excludes = parseExcludeFilter(customFilters);

    // 准备数据:用户通过别名的方式注入的filter,需要解析
    List<ExtensionClass<Filter>> extensionFilters = new ArrayList<ExtensionClass<Filter>>();
    List<String> filterAliases = consumerConfig.getFilter(); //
    if (CommonUtils.isNotEmpty(filterAliases)) {
        for (String filterAlias : filterAliases) {
            if (startsWithExcludePrefix(filterAlias)) { // 排除用的特殊字符
                excludes.add(filterAlias.substring(1));
            } else {
                ExtensionClass<Filter> filter = EXTENSION_LOADER.getExtensionClass(filterAlias);
                if (filter != null) {
                    extensionFilters.add(filter);
                }
            }
        }
    }
    // 解析自动加载的过滤器
    if (!excludes.contains(StringUtils.ALL) && !excludes.contains(StringUtils.DEFAULT)) { // 配了-*和-default表示不加载内置
        for (Map.Entry<String, ExtensionClass<Filter>> entry : CONSUMER_AUTO_ACTIVES.entrySet()) {
            if (!excludes.contains(entry.getKey())) {
                extensionFilters.add(entry.getValue());
            }
        }
    }
    excludes = null; // 不需要了
    // 按order从小到大排序
    if (extensionFilters.size() > 1) {
        Collections.sort(extensionFilters, new OrderedComparator<ExtensionClass<Filter>>());
    }
    List<Filter> actualFilters = new ArrayList<Filter>();
    for (ExtensionClass<Filter> extensionFilter : extensionFilters) {
        actualFilters.add(extensionFilter.getExtInstance());
    }
    // 加入自定义的过滤器
    actualFilters.addAll(customFilters);
    return new FilterChain(actualFilters, lastFilter, consumerConfig);
}

连接管理器初始化启动重连线程:读取服务引用配置服务接口以及服务端消费者给提供者重连间隔创建线程池启动重连+心跳线程,存活的客户端列表(保持长连接,且一切正常的)检查可用连接,非可用连接(不可用的长连接)从存活丢到重试列表,失败待重试的客户端列表(连上后断开的)命中服务提供者动态配置的重试周期系数进行重连,客户端传输建立长连接(基于Bolt协议的客户端传输实现BoltClientTransport只支持长连接共享模式,RpcClient通过URL缓存和超时时间建立连接,基于第三方协议譬如cxf/resteasy的客户端传输实现AbstractProxyClientTransport,按照客户端配置建立Socket连接),两次验证检查客户端传输是否存活,睡眠防止被连上又被服务端踢下线,设置服务提供者重试周期系数,从重试丢到存活列表,存活的客户端列表和存活但是亚健康节点(连续心跳超时,这种只发心跳,不发请求)原来为空变成非空需要通知状态变成可用,主要包括1.启动成功变成可用时;2.注册中心增加,更新节点后变成可用时;3.重连上从一个可用节点都没有变成有可用节点时:

public void init() {
    if (reconThread == null) {
        startReconnectThread();
    }
}
/**
 * 启动重连+心跳线程
 */
protected void startReconnectThread() {
    final String interfaceId = consumerConfig.getInterfaceId();
    // 启动线程池
    // 默认每隔10秒重连
    int reconnect = consumerConfig.getReconnectPeriod();
    if (reconnect > 0) {
        reconnect = Math.max(reconnect, 2000); // 最小2000
        reconThread = new ScheduledService("CLI-RC-" + interfaceId, ScheduledService.MODE_FIXEDDELAY, new
            Runnable() {
                @Override
                public void run() {
                    try {
                        doReconnect();
                    } catch (Throwable e) {
                        LOGGER.errorWithApp(consumerConfig.getAppName(),
                            "Exception when retry connect to provider", e);
                    }
                }
            }, reconnect, reconnect, TimeUnit.MILLISECONDS).start();
    }
}
/**
 * 重连断开和死亡的节点
 */
private void doReconnect() {
    String interfaceId = consumerConfig.getInterfaceId();
    String appName = consumerConfig.getAppName();
    int thisTime = reconnectFlag.incrementAndGet();
    boolean print = thisTime % 6 == 0; //是否打印error,每6次打印一次
    boolean isAliveEmptyFirst = isAvailableEmpty();
    // 检查可用连接  todo subHealth
    for (Map.Entry<ProviderInfo, ClientTransport> alive : aliveConnections.entrySet()) {
        ClientTransport connection = alive.getValue();
        if (connection != null && !connection.isAvailable()) {
            aliveToRetry(alive.getKey(), connection);
        }
    }
    for (Map.Entry<ProviderInfo, ClientTransport> entry : getRetryConnections()
        .entrySet()) {
        ProviderInfo providerInfo = entry.getKey();
        int providerPeriodCoefficient = CommonUtils.parseNum((Integer)
            providerInfo.getDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT), 1);
        if (thisTime % providerPeriodCoefficient != 0) {
            continue; // 如果命中重连周期,则进行重连
        }
        ClientTransport transport = entry.getValue();
        if (LOGGER.isDebugEnabled(appName)) {
            LOGGER.debugWithApp("Retry connect to {} provider:{} ...", interfaceId, providerInfo);
        }
        try {
            transport.connect();
            if (doubleCheck(interfaceId, providerInfo, transport)) {
                providerInfo.setDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT, 1);
                retryToAlive(providerInfo, transport);
            }
        } catch (Exception e) {
            if (print) {
                if (LOGGER.isWarnEnabled(appName)) {
                    LOGGER.warnWithApp("Retry connect to {} provider:{} error ! The exception is " + e
                        .getMessage(), interfaceId, providerInfo);
                }
            } else {
                if (LOGGER.isDebugEnabled(appName)) {
                    LOGGER.debugWithApp("Retry connect to {} provider:{} error ! The exception is " + e
                        .getMessage(), interfaceId, providerInfo);
                }
            }
        }
    }
    if (isAliveEmptyFirst && !isAvailableEmpty()) { // 原来空,变成不空
        notifyStateChangeToAvailable();
    }
}

服务端消费者启动器订阅服务发布列表,获取服务消费者配置直连调用地址,判断直连调用地址是否为空,直连调用地址为空根据注册中心配置从注册中心订阅服务发布列表,遍历服务端订阅者注册中心配置注册中心工厂生成注册中心对象,初始化启动注册中心订阅服务发布列表添加到服务提供者分组;直连调用地址非空按照逗号或者分号分隔直连调用地址遍历转换成服务端提供者添加到默认直连分组,返回直连服务端发布方分组:

public List<ProviderGroup> subscribe() {
    List<ProviderGroup> result = null;
    String directUrl = consumerConfig.getDirectUrl();
    if (StringUtils.isNotEmpty(directUrl)) {
        // 如果走直连
        result = subscribeFromDirectUrl(directUrl);
    } else {
        // 没有配置url直连
        List<RegistryConfig> registryConfigs = consumerConfig.getRegistry();
        if (CommonUtils.isNotEmpty(registryConfigs)) {
            // 从多个注册中心订阅服务列表
            result = subscribeFromRegistries();
        }
    }
    return result;
}
/**
 * Subscribe provider list from direct url
 *
 * @param directUrl direct url of consume config
 * @return Provider group list
 */
protected List<ProviderGroup> subscribeFromDirectUrl(String directUrl) {
    List<ProviderGroup> result = new ArrayList<ProviderGroup>();
    List<ProviderInfo> tmpProviderInfoList = new ArrayList<ProviderInfo>();
    String[] providerStrs = StringUtils.splitWithCommaOrSemicolon(directUrl);
    for (String providerStr : providerStrs) {
        ProviderInfo providerInfo = convertToProviderInfo(providerStr);
        if (providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_SOURCE) == null) {
            providerInfo.setStaticAttr(ProviderInfoAttrs.ATTR_SOURCE, "direct");
        }
        tmpProviderInfoList.add(providerInfo);
    }

    result.add(new ProviderGroup(RpcConstants.ADDRESS_DIRECT_GROUP, tmpProviderInfoList));
    return result;
}
/**
 * Subscribe provider list from all registries, the providers will be merged.
 *
 * @return Provider group list
 */
protected List<ProviderGroup> subscribeFromRegistries() {
    List<ProviderGroup> result = new ArrayList<ProviderGroup>();
    List<RegistryConfig> registryConfigs = consumerConfig.getRegistry();
    if (CommonUtils.isEmpty(registryConfigs)) {
        return result;
    }
    // 是否等待结果
    int addressWaitTime = consumerConfig.getAddressWait();
    int maxAddressWaitTime = SofaConfigs.getIntegerValue(consumerConfig.getAppName(),
        SofaOptions.CONFIG_MAX_ADDRESS_WAIT_TIME, SofaOptions.MAX_ADDRESS_WAIT_TIME);
    addressWaitTime = addressWaitTime < 0 ? maxAddressWaitTime : Math.min(addressWaitTime, maxAddressWaitTime);

    ProviderInfoListener listener = consumerConfig.getProviderInfoListener();
    respondRegistries = addressWaitTime == 0 ? null : new CountDownLatch(registryConfigs.size());

    // 从注册中心订阅 {groupName: ProviderGroup}
    Map<String, ProviderGroup> tmpProviderInfoList = new HashMap<String, ProviderGroup>();
    for (RegistryConfig registryConfig : registryConfigs) {
        Registry registry = RegistryFactory.getRegistry(registryConfig);
        registry.init();
        registry.start();

        try {
            List<ProviderGroup> current;
            try {
                if (respondRegistries != null) {
                    consumerConfig.setProviderInfoListener(new WrapperClusterProviderInfoListener(listener,
                        respondRegistries));
                }
                current = registry.subscribe(consumerConfig);
            } finally {
                if (respondRegistries != null) {
                    consumerConfig.setProviderInfoListener(listener);
                }
            }
            if (current == null) {
                continue; // 未同步返回结果
            } else {
                if (respondRegistries != null) {
                    respondRegistries.countDown();
                }
            }
            for (ProviderGroup group : current) { //  当前注册中心的
                String groupName = group.getName();
                if (!group.isEmpty()) {
                    ProviderGroup oldGroup = tmpProviderInfoList.get(groupName);
                    if (oldGroup != null) {
                        oldGroup.addAll(group.getProviderInfos());
                    } else {
                        tmpProviderInfoList.put(groupName, group);
                    }
                }
            }
        } catch (SofaRpcRuntimeException e) {
            throw e;
        } catch (Throwable e) {
            String appName = consumerConfig.getAppName();
            if (LOGGER.isWarnEnabled(appName)) {
                LOGGER.warnWithApp(appName,
                    "Catch exception when subscribe from registry: " + registryConfig.getId()
                        + ", but you can ignore if it's called by JVM shutdown hook", e);
            }
        }
    }
    if (respondRegistries != null) {
        try {
            respondRegistries.await(addressWaitTime, TimeUnit.MILLISECONDS);
        } catch (Exception ignore) { // NOPMD
        }
    }
    return new ArrayList<ProviderGroup>(tmpProviderInfoList.values());
}

遍历直连服务端提供者分组初始化服务端连接(建立长连接),检查服务节点协议类型和序列化方式信息,地址保持器/连接管理器全量更新全部服务端列表,依赖服务端消费者配置、地址保持器服务端发布者分组、直连服务端提供者分组构建服务提供更新事件给事件总线发布,事件总线按照事件订阅者是否同步进行事件同步/异步处理:

public void updateAllProviders(List<ProviderGroup> providerGroups) {
   List<ProviderGroup> oldProviderGroups = new ArrayList<ProviderGroup>(addressHolder.getProviderGroups());
   int count = 0;
   if (providerGroups != null) {
       for (ProviderGroup providerGroup : providerGroups) {
           checkProviderInfo(providerGroup);
           count += providerGroup.size();
       }
   }
   if (count == 0) {
       Collection<ProviderInfo> currentProviderList = currentProviderList();
       addressHolder.updateAllProviders(providerGroups);
       if (CommonUtils.isNotEmpty(currentProviderList)) {
           if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
               LOGGER.warnWithApp(consumerConfig.getAppName(), "Provider list is emptied, may be all " +
                   "providers has been closed, or this consumer has been add to blacklist");
               closeTransports();
           }
       }
   } else {
       addressHolder.updateAllProviders(providerGroups);
       connectionHolder.updateAllProviders(providerGroups);
   }
   if (EventBus.isEnable(ProviderInfoUpdateAllEvent.class)) {
       ProviderInfoUpdateAllEvent event = new ProviderInfoUpdateAllEvent(consumerConfig, oldProviderGroups,
           providerGroups);
       EventBus.post(event);
   }
}
/**
* 检测服务节点的一些信息
*
* @param providerGroup 服务列表分组
*/
protected void checkProviderInfo(ProviderGroup providerGroup) {
   List<ProviderInfo> providerInfos = providerGroup == null ? null : providerGroup.getProviderInfos();
   if (CommonUtils.isEmpty(providerInfos)) {
       return;
   }
   Iterator<ProviderInfo> iterator = providerInfos.iterator();
   while (iterator.hasNext()) {
       ProviderInfo providerInfo = iterator.next();
       if (!StringUtils.equals(providerInfo.getProtocolType(), consumerConfig.getProtocol())) {
           if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
               LOGGER.warnWithApp(consumerConfig.getAppName(),
                   "Unmatched protocol between consumer [{}] and provider [{}].",
                   consumerConfig.getProtocol(), providerInfo.getProtocolType());
           }
       }
       if (StringUtils.isEmpty(providerInfo.getSerializationType())) {
           providerInfo.setSerializationType(consumerConfig.getSerialization());
       }
   }
}
/**
* 给事件总线中丢一个事件
*
* @param event 事件
*/
public static void post(final Event event) {
   if (!isEnable()) {
       return;
   }
   CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass());
   if (CommonUtils.isNotEmpty(subscribers)) {
       for (final Subscriber subscriber : subscribers) {
           if (subscriber.isSync()) {
               handleEvent(subscriber, event);
           } else { // 异步
               final RpcInternalContext context = RpcInternalContext.peekContext();
               AsyncRuntime.getAsyncThreadPool().execute(
                   new Runnable() {
                       @Override
                       public void run() {
                           try {
                               RpcInternalContext.setContext(context);
                               handleEvent(subscriber, event);
                           } catch (Exception e) {
                               RpcInternalContext.removeContext();
                           }
                       }
                   });
           }
       }
   }
}
private static void handleEvent(final Subscriber subscriber, final Event event) {
   try {
       subscriber.onEvent(event);
   } catch (Throwable e) {
       if (LOGGER.isWarnEnabled()) {
           LOGGER.warn("Handle " + event.getClass() + " error", e);
       }
   }
}

(7)服务端订阅者启动器创建客户端代理调用器ClientProxyInvoker执行链,构建代理调用器通过默认调用端代理执行器构造执行链缓存接口名和序列化类型,注入到动态代理调用处理器提供构建代理类调用器执行参数:

/**
* Build ClientProxyInvoker for consumer bootstrap.
*
* @param bootstrap ConsumerBootstrap
* @return ClientProxyInvoker
*/
protected ClientProxyInvoker buildClientProxyInvoker(ConsumerBootstrap bootstrap) {
   return new DefaultClientProxyInvoker(bootstrap);
}
/**
* 构造执行链
*
* @param bootstrap 调用端配置
*/
public DefaultClientProxyInvoker(ConsumerBootstrap bootstrap) {
   super(bootstrap);
   cacheCommonData();
}

(8)代理工厂按照服务端订阅者配置代理类型构建客户端代理调用器代理类实例,缓存服务端消费者配置ConsumerConfig,动态代理类型包括基于JDK动态代理实现JDKProxy(默认)和基于Javassist动态代理实现JavassistProxy,其中JDKProxy代理实现通过JDK代理处理器JDKInvocationHandler拦截请求变为Invocation执行invoke()方法远程调用,客户端代理调用器ClientProxyInvoker实施Proxy拦截调用使用客户端集群Cluster最底层调用过滤器,以消费端调用器ConsumerInvoker进行Client发送数据给Server调用过程,即Cluster.sendMsg()->ClientTransport.****Send()->RpcClient.invokeWith*****()->Channel.writeAndFlush():

/**
 * 构建代理类实例
 *
 * @param proxyType    代理类型
 * @param clazz        原始类
 * @param proxyInvoker 代码执行的Invoker
 * @param <T>          类型
 * @return 代理类实例
 * @throws Exception
 */
public static <T> T buildProxy(String proxyType, Class<T> clazz, Invoker proxyInvoker) throws Exception {
    try {
        ExtensionClass<Proxy> ext = ExtensionLoaderFactory.getExtensionLoader(Proxy.class)
            .getExtensionClass(proxyType);
        if (ext == null) {
            throw ExceptionUtils.buildRuntime("consumer.proxy", proxyType,
                "Unsupported proxy of client!");
        }
        Proxy proxy = ext.getExtInstance();
        return proxy.getProxy(clazz, proxyInvoker);
    } catch (SofaRpcRuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new SofaRpcRuntimeException(e.getMessage(), e);
    }
}
public Object invoke(Object proxy, Method method, Object[] paramValues)
    throws Throwable {
    String methodName = method.getName();
    Class[] paramTypes = method.getParameterTypes();
    if ("toString".equals(methodName) && paramTypes.length == 0) {
        return proxyInvoker.toString();
    } else if ("hashCode".equals(methodName) && paramTypes.length == 0) {
        return proxyInvoker.hashCode();
    } else if ("equals".equals(methodName) && paramTypes.length == 1) {
        Object another = paramValues[0];
        return proxy == another ||
            (proxy.getClass().isInstance(another) && proxyInvoker.equals(JDKProxy.parseInvoker(another)));
    }
    SofaRequest sofaRequest = MessageBuilder.buildSofaRequest(method.getDeclaringClass(),
        method, paramTypes, paramValues);
    SofaResponse response = proxyInvoker.invoke(sofaRequest);
    if (response.isError()) {
        throw new SofaRpcException(RpcErrorType.SERVER_UNDECLARED_ERROR, response.getErrorMsg());
    }
    Object ret = response.getAppResponse();
    if (ret instanceof Throwable) {
        throw (Throwable) ret;
    } else {
        if (ret == null) {
            return ClassUtils.getDefaultPrimitiveValue(method.getReturnType());
        }
        return ret;
    }
}
/**
 * proxy拦截的调用
 *
 * @param request 请求消息
 * @return 调用结果
 */
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
    SofaResponse response = null;
    Throwable throwable = null;
    try {
        RpcInternalContext.pushContext();
        RpcInternalContext context = RpcInternalContext.getContext();
        context.setProviderSide(false);
        // 包装请求
        decorateRequest(request);
        try {
            // 产生开始调用事件
            if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
                EventBus.post(new ClientStartInvokeEvent(request));
            }
            // 得到结果
            response = cluster.invoke(request);
        } catch (SofaRpcException e) {
            throwable = e;
            throw e;
        } finally {
            // 产生调用结束事件
            if (!request.isAsync()) {
                if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
                    EventBus.post(new ClientEndInvokeEvent(request, response, throwable));
                }
            }
        }
        // 包装响应
        decorateResponse(response);
        return response;
    } finally {
        RpcInternalContext.removeContext();
        RpcInternalContext.popContext();
    }
}

解析总结
SOFARPC服务引用流程概括为SOFARPC服务需要创建服务引用配置ConsumerConfig,自定义设置接口名称、调用协议、直连调用地址以及连接超时时间等客户端配置;服务消费者启动类ConsumerBootstrap引用服务:构造客户端集群Cluster封装集群模式、长连接管理、服务路由、负载均衡,初始化客户端集群Cluster(构建Router链routerChain,加载负载均衡策略loadBalancer,获取地址保持器addressHolder,构造Filter链filterChain最底层是调用过滤器,连接管理器启动重连线程,订阅服务发布者,初始化服务端连接(建立长连接),创建客户端代理调用器ClientProxyInvoker执行链,创建客户端调用器代理实现引用服务链路,基于JDK动态代理场景RPC服务远程根据JDK代理处理器JDKInvocationHandler拦截请求转换成Invocation执行invoke()方法以客户端代理调用器ClientProxyInvoker实施Proxy拦截调用,使用客户端集群Cluster调用消费端调用器ConsumerInvoker进行客户端发送数据给服务端完成远程调用过程。

上一篇下一篇

猜你喜欢

热点阅读