SOFARPC 源码分析3 - 各种 Config 配置类

2018-11-24  本文已影响35人  原水寒

本节大致记录 SOFARPC 中各个 Config 类的配置属性及方法,浏览一遍,基本上就对 SOFARPC 的核心功能有了印象。同时,对各个配置类的整体设计也会比较清楚。后续在分析到各种配置的时候,可以回来看一下。


image.png

关于配置优先级等,可以参考:http://www.sofastack.tech/sofa-rpc/docs/Publish-And-Reference

一、 注册中心配置类 RegistryConfig

public class RegistryConfig extends AbstractIdConfig implements Serializable {
    // 注册中心协议:默认是 zookeeper
    private String                protocol         = getStringValue(DEFAULT_REGISTRY);
    // 指定注册中心的地址, 和index必须填一个,address优先
    private String                address;
    // 指定注册中心寻址服务的地址, 和address必须填一个
    private String                index            = getStringValue(REGISTRY_INDEX_ADDRESS);
    // 是否注册,如果是false只订阅不注册,默认true
    private boolean               register         = getBooleanValue(SERVICE_REGISTER);
    // 是否订阅服务,默认true
    private boolean               subscribe        = getBooleanValue(SERVICE_SUBSCRIBE);
    // 调用注册中心超时时间,默认10000
    private int                   timeout          = getIntValue(REGISTRY_INVOKE_TIMEOUT);
    // 连接注册中心超时时间,默认20000
    private int                   connectTimeout   = getIntValue(REGISTRY_CONNECT_TIMEOUT);
    // 保存到本地文件的位置,默认$HOME下
    private String                file;
    // 是否批量操作,默认为true
    private boolean               batch            = getBooleanValue(REGISTRY_BATCH);
    // 定时批量检查时的条目数,默认为10
    private int                   batchSize        = getIntValue(REGISTRY_BATCH_SIZE);
    // Consumer给Provider发心跳的间隔,默认为30000
    protected int                 heartbeatPeriod  = getIntValue(REGISTRY_HEARTBEAT_PERIOD);
    // Consumer给Provider重连的间隔,默认为30000
    protected int                 reconnectPeriod  = getIntValue(REGISTRY_RECONNECT_PERIOD);
    // The Parameters. 自定义参数
    protected Map<String, String> parameters;
}

二、 通信服务配置类 ServerConfig

public class ServerConfig extends AbstractIdConfig implements Serializable {
    /*------------- 参数配置项开始-----------------*/
    // 通信协议,默认为bolt
    protected String                          protocol         = getStringValue(DEFAULT_PROTOCOL);

    // 实际监听IP,与网卡对应,默认为0.0.0.0
    protected String                          host             = getStringValue(SERVER_HOST);

    // 监听端口,默认为12200
    protected int                             port             = getIntValue(SERVER_PORT_START);

    // 基本路径,默认为""
    protected String                          contextPath      = getStringValue(SERVER_CONTEXT_PATH);

    // io线程池大小,默认为0
    protected int                             ioThreads        = getIntValue(SERVER_IOTHREADS);

    // 线程池类型,默认是 cached
    protected String                          threadPoolType   = getStringValue(SERVER_POOL_TYPE);

    // 业务线程池大小,默认是20
    protected int                             coreThreads      = getIntValue(SERVER_POOL_CORE);

    // 业务线程池大小,默认是200
    protected int                             maxThreads       = getIntValue(SERVER_POOL_MAX);

    // 是否允许telnet,针对自定义协议,默认是true
    protected boolean                         telnet           = getBooleanValue(SERVER_TELNET);

    // 线程池类型,默认普通线程池 normal
    protected String                          queueType        = getStringValue(SERVER_POOL_QUEUE_TYPE);

    // 业务线程池队列大小,默认为0
    protected int                             queues           = getIntValue(SERVER_POOL_QUEUE);

    // 默认业务线程池回收时间,默认60000
    protected int                             aliveTime        = getIntValue(SERVER_POOL_ALIVETIME);

    // 线程池是否初始化核心线程,默认false
    protected boolean                         preStartCore     = getBooleanValue(SERVER_POOL_PRE_START);

    // 服务端允许客户端建立的连接数,默认100000
    protected int                             accepts          = getIntValue(SERVER_ACCEPTS);

    // 序列化方式,默认hessian2
    protected String                          serialization    = getStringValue(DEFAULT_SERIALIZATION);

    // The Parameters. 自定义参数
    protected Map<String, String>             parameters;

    // 镜像ip,例如监听地址是1.2.3.4,告诉注册中心的确是3.4.5.6
    protected String                          virtualHost;

    // 镜像端口
    protected Integer                         virtualPort;

    // 连接事件监听器实例,连接或者断开时触发
    protected transient List<ChannelListener> onConnect;

    // 是否启动epoll,默认为false
    protected boolean                         epoll            = getBooleanValue(SERVER_EPOLL);

    // 是否hold住端口,true的话随主线程退出而退出,false的话则要主动退出,默认为true
    protected boolean                         daemon           = getBooleanValue(SERVER_DAEMON);

    // 端口是否自适应,如果端口被占用,自动+1;默认false
    protected boolean                         adaptivePort     = getBooleanValue(SEVER_ADAPTIVE_PORT);

    // 网络层协议,默认为netty4
    protected String                          transport        = getStringValue(DEFAULT_TRANSPORT);

    // 服务端是否自动启动,默认为true
    protected boolean                         autoStart        = getBooleanValue(SEVER_AUTO_START);

    // 服务端关闭超时时间,默认20000
    protected int                             stopTimeout      = getIntValue(SERVER_STOP_TIMEOUT);

    // 是否维持长连接,默认为true
    protected boolean                         keepAlive        = getBooleanValue(TRANSPORT_SERVER_KEEPALIVE);

    /*------------- 参数配置项结束-----------------*/
    // 服务端对象
    private transient volatile Server         server;

    // 绑定的地址。是某个网卡,还是全部地址
    private transient String                  boundHost;

    // 启动服务
    public synchronized Server buildIfAbsent() {
        if (server != null) {
            return server;
        }
        // 创建并初始化 Server 实例
        server = ServerFactory.getServer(this);
        return server;
    }

    // 关闭服务
    public synchronized void destroy() {
        if (server != null) {
            server.destroy();
        }
    }
}

三、接口公共配置类 AbstractInterfaceConfig

/**
 * 接口级的公共配置
 * @param <T> the interface
 * @param <S> the sub class of AbstractInterfaceConfig
 */
public abstract class AbstractInterfaceConfig<T, S extends AbstractInterfaceConfig> extends AbstractIdConfig<S> implements Serializable {
    /*-------------配置项开始----------------*/
    // 应用信息
    protected ApplicationConfig application = new ApplicationConfig();
    // 服务接口:做为服务唯一标识的组成部分, 不管普通调用和泛化调用,都是设置实际的接口类名称,
    protected String interfaceId;
    // 服务标签:做为服务唯一标识的组成部分, 默认为""
    protected String uniqueId = getStringValue(DEFAULT_UNIQUEID);
    // 过滤器配置实例
    protected transient List<Filter> filterRef;
    // 过滤器配置别名,多个用逗号隔开
    protected List<String> filter;
    // 注册中心配置,可配置多个
    protected List<RegistryConfig> registry;
    // 方法配置,可配置多个
    protected Map<String, MethodConfig> methods;
    // 默认序列化, 默认hessian2
    protected String serialization = getStringValue(DEFAULT_SERIALIZATION);
    // 是否注册,如果是false只订阅不注册,默认true
    protected boolean register = getBooleanValue(SERVICE_REGISTER);
    // 是否订阅服务,默认true
    protected boolean subscribe = getBooleanValue(SERVICE_SUBSCRIBE);
    // 代理类型,默认javassist
    protected String proxy = getStringValue(DEFAULT_PROXY);
    // 服务分组:不做为服务唯一标识的一部分,请直接使用 {@link #uniqueId} 代替
    @Deprecated
    protected String group = getStringValue(DEFAULT_GROUP);
    // 服务版本:不做为服务唯一标识的一部分,请直接使用 {@link #uniqueId} 代替
    protected String version = getStringValue(DEFAULT_VERSION);
    // 结果缓存实现类
    protected transient Cache cacheRef;
    // Mock实现类
    protected transient T mockRef;
    // 自定义参数
    protected Map<String, String> parameters;

    /*-------- 下面是方法级配置 --------*/
    // 是否启动结果缓存
    protected boolean cache;
    // 是否开启mock
    protected boolean mock;
    // 是否开启参数验证(jsr303)
    protected boolean validation;
    // 压缩算法,为空则不压缩
    protected String compress;
    /*-------------配置项结束----------------*/

    // 方法名称和方法参数配置的map,不需要遍历list
    protected transient volatile Map<String, Object> configValueCache = null;
    // 代理接口类,和T对应,主要针对泛化调用
    protected transient volatile Class proxyClass;
    // 服务配置的listener
    protected transient volatile ConfigListener configListener;
    
    // Gets proxy class.
    protected abstract Class<?> getProxyClass();
    // 构造关键字方法 @return 唯一标识 string
    protected abstract String buildKey();
    // 是否有超时配置
    public abstract boolean hasTimeout();
    // 是否有并发限制配置
    public abstract boolean hasConcurrents();

    // 除了判断自己,还有判断下面方法的自定义判断
    public boolean hasValidation() {
        if (validation) {
            return true;
        }
        if (CommonUtils.isNotEmpty(methods)) {
            for (MethodConfig methodConfig : methods.values()) {
                if (CommonUtils.isTrue(methodConfig.getValidation())) {
                    return true;
                }
            }
        }
        return false;
    }

    // 是否有缓存
    public boolean hasCache() {
        if (isCache()) {
            return true;
        }
        if (CommonUtils.isNotEmpty(methods)) {
            for (MethodConfig methodConfig : methods.values()) {
                if (CommonUtils.isTrue(methodConfig.getCache())) {
                    return true;
                }
            }
        }
        return false;
    }

    // 是否有token配置
    public boolean hasToken() {
        // .token 配置
        if (getParameter(RpcConstants.HIDDEN_KEY_TOKEN) != null) {
            return true;
        }
        if (CommonUtils.isNotEmpty(methods)) {
            for (MethodConfig methodConfig : methods.values()) {
                if (methodConfig.getParameter(RpcConstants.HIDDEN_KEY_TOKEN) != null) {
                    return true;
                }
            }
        }
        return false;
    }
}

四、提供端配置类 ProviderConfig

public class ProviderConfig<T> extends AbstractInterfaceConfig<T, ProviderConfig<T>> implements Serializable {
    /*---------- 参数配置项开始 ------------*/
    // 接口实现类引用
    protected transient T ref;
    // 配置的 Server 列表
    protected List<ServerConfig> server;
    // 服务发布延迟,单位毫秒,默认为-1
    protected int delay = getIntValue(PROVIDER_DELAY);
    // 服务端权重,默认为100
    protected int weight = getIntValue(PROVIDER_WEIGHT);
    // 发布方法:默认全部 *
    protected String include = getStringValue(PROVIDER_INCLUDE);
    // 不发布的方法列表,逗号分隔,默认为""
    protected String exclude = getStringValue(PROVIDER_EXCLUDE);
    // 是否动态注册,默认为true,配置为false代表不主动发布,需要到管理端进行上线操作
    protected boolean dynamic = getBooleanValue(PROVIDER_DYNAMIC);
    // 服务优先级,越大越高
    protected int priority = getIntValue(PROVIDER_PRIORITY);
    // 启动辅助类 key
    protected String bootstrap;
    // 自定义线程池
    protected transient ThreadPoolExecutor executor;

    /*-------- 下面是方法级可覆盖配置 --------*/
    // 服务端执行超时时间(毫秒),不会打断执行线程,只是打印警告,默认为0,表示不判断
    protected int timeout = getIntValue(PROVIDER_INVOKE_TIMEOUT);
    // 接口下每方法的最大可并行执行请求数,配置-1关闭并发过滤器,等于0表示开启过滤但是不限制,默认为0
    protected int concurrents = getIntValue(PROVIDER_CONCURRENTS);
    /*---------- 参数配置项结束 ------------*/

    // 方法名称:是否可调用
    protected transient volatile ConcurrentMap<String, Boolean> methodsLimit;
    // 服务提供者辅助类,加载 key=bootstrap 的 ProviderBootstrap 实现
    protected transient ProviderBootstrap providerBootstrap;

    // Gets proxy class.
    @Override
    public Class<?> getProxyClass() {
        if (proxyClass != null) {
            return proxyClass;
        }
        // 加载 interfaceId 的接口
        this.proxyClass = ClassUtils.forName(interfaceId);
        return proxyClass;
    }

    /**
     * Build key.
     *
     * @return the string
     */
    @Override
    public String buildKey() {
        return interfaceId + ":" + uniqueId;
    }
    
    @Override
    public boolean hasTimeout() {
        if (timeout > 0) {
            return true;
        }
        if (CommonUtils.isNotEmpty(methods)) {
            for (MethodConfig methodConfig : methods.values()) {
                if (methodConfig.getTimeout() > 0) {
                    return true;
                }
            }
        }
        return false;
    }

    /**
     * 是否有并发控制需求,有就打开过滤器
     * 配置-1关闭并发过滤器,等于0表示开启过滤但是不限制
     */
    @Override
    public boolean hasConcurrents() {
        if (concurrents > 0) {
            return true;
        }
        if (CommonUtils.isNotEmpty(methods)) {
            for (MethodConfig methodConfig : methods.values()) {
                if (methodConfig.getConcurrents() != null
                        && methodConfig.getConcurrents() > 0) {
                    return true;
                }
            }
        }
        return false;
    }
    
    /**
     * 发布服务
     */
    public synchronized void export() {
        if (providerBootstrap == null) {
            providerBootstrap = Bootstraps.from(this);
        }
        providerBootstrap.export();
    }

    /**
     * 取消发布服务
     */
    public synchronized void unExport() {
        if (providerBootstrap != null) {
            providerBootstrap.unExport();
        }
    }
}

可以看到服务发布是 ProviderConfig 唤起的

五、消费端配置类 ConsumerConfig

public class ConsumerConfig<T> extends AbstractInterfaceConfig<T, ConsumerConfig<T>> implements Serializable {
    // 通信协议,默认为bolt
    protected String protocol = getStringValue(DEFAULT_PROTOCOL);
    // 直连调用地址
    protected String directUrl;
    // 是否泛化调用
    protected boolean generic;
    // 调用方式,默认为sync
    protected String invokeType = getStringValue(CONSUMER_INVOKE_TYPE);
    // consumer连provider超时时间,默认5000
    protected int connectTimeout = getIntValue(CONSUMER_CONNECT_TIMEOUT);
    // consumer断开时等待结果的超时时间,默认10000
    protected int disconnectTimeout = getIntValue(CONSUMER_DISCONNECT_TIMEOUT);
    // 集群处理,默认是failover
    protected String cluster = getStringValue(CONSUMER_CLUSTER);
    // 连接管理器, 默认all,表示连接全部建立长连接
    protected String connectionHolder = getStringValue(CONSUMER_CONNECTION_HOLDER);
    // 地址管理器,默认singleGroup,单分组
    protected String addressHolder = getStringValue(CONSUMER_ADDRESS_HOLDER);
    // 负载均衡,默认random
    protected String loadBalancer = getStringValue(CONSUMER_LOAD_BALANCER);
    // 是否延迟建立长连接(第一次调用时新建,注意此参数可能和check冲突,开启check后lazy自动失效)
    // 默为false
    protected boolean lazy = getBooleanValue(CONSUMER_LAZY);
    // 粘滞连接,一个断开才选下一个: change transport when current is disconnected
    // 默认false
    protected boolean sticky = getBooleanValue(CONSUMER_STICKY);
    // 是否jvm内部调用(provider和consumer配置在同一个jvm内,则走本地jvm内部,不走远程)
    // 默认false
    protected boolean inJVM = getBooleanValue(CONSUMER_INJVM);
    // 是否强依赖(即没有服务节点就启动失败,注意此参数可能和lazy冲突,开启check后lazy自动失效)
    // 默认false
    protected boolean check = getBooleanValue(CONSUMER_CHECK);
    // 长连接个数,不是所有的框架都支持一个地址多个长连接
    // 默认1
    protected int connectionNum = getIntValue(CONSUMER_CONNECTION_NUM);
    // Consumer给Provider发心跳的间隔
    // 默认30000
    protected int heartbeatPeriod = getIntValue(CONSUMER_HEARTBEAT_PERIOD);
    // Consumer给Provider重连的间隔
    // 默认10000
    protected int reconnectPeriod = getIntValue(CONSUMER_RECONNECT_PERIOD);
    // 路由配置别名
    protected List<String> router;
    // 路由规则引用,多个用英文逗号隔开。List<Router>
    protected transient List<Router> routerRef;
    // 返回时的回调函数
    protected transient SofaResponseCallback onReturn;
    // 连接事件监听器实例,连接或者断开时触发
    @Unstable
    protected transient List<ChannelListener> onConnect;
    // 客户端状态变化监听器实例,状态可用和不可以时触发
    @Unstable
    protected transient List<ConsumerStateListener> onAvailable;
    // 启动器
    protected String bootstrap;
    // 客户端获取地址等待时间(毫秒),-1表示一直等
    protected int addressWait = getIntValue(CONSUMER_ADDRESS_WAIT);

    /*-------- 下面是方法级可覆盖配置 --------*/
    // consumer调用provider超时时间(毫秒), 默认5000
    protected int timeout = getIntValue(CONSUMER_INVOKE_TIMEOUT);
    // 失败后重试次数,默认是0
    protected int retries = getIntValue(CONSUMER_RETRIES);
    // 接口下每方法的最大可并行执行请求数,配置-1关闭并发过滤器,等于0表示开启过滤但是不限制,默认是0
    protected int concurrents = getIntValue(CONSUMER_CONCURRENTS);
    /*---------- 参数配置项结束 ------------*/

    // 服务消费者启动类
    private transient ConsumerBootstrap<T> consumerBootstrap;
    // 服务列表的listener
    private transient volatile ProviderInfoListener providerInfoListener;

    // Build key.
    @Override
    public String buildKey() {
        return protocol + "://" + interfaceId + ":" + uniqueId;
    }

    // Gets proxy class.
    @Override
    public Class<?> getProxyClass() {
        if (proxyClass != null) {
            return proxyClass;
        }
        // 泛化
        if (generic) {
            return GenericService.class;
        }
        // 正常接口
        this.proxyClass = ClassUtils.forName(interfaceId);
        return proxyClass;
    }


    /**
     * 引用服务
     * @return 服务代理类 t
     */
    public T refer() {
        if (consumerBootstrap == null) {
            consumerBootstrap = Bootstraps.from(this);
        }
        return consumerBootstrap.refer();
    }

    /**
     * 取消引用服务
     */
    public void unRefer() {
        if (consumerBootstrap != null) {
            consumerBootstrap.unRefer();
        }
    }
}

可以看到服务引用是 ConsumerConfig 唤起的

六、应用配置类 ApplicationConfig

public class ApplicationConfig implements Serializable {
    // The App name.
    protected String appName;
    // The App id.
    protected String appId;
    // The Ins id. 实例ID
    protected String insId;
}

七、方法配置类 MethodConfig

/**
 * 方法级配置
 */
public class MethodConfig implements Serializable {
    /*-------------配置项开始----------------*/
    // 方法名称,无法做到重载方法的配置
    private String                 name;
    // 自定义参数
    protected Map<String, String>  parameters;
    // 远程调用超时时间(毫秒)
    protected Integer              timeout;
    // 失败后重试次数
    protected Integer              retries;
    // 调用方式
    protected String               invokeType;
    // 是否jsr303验证
    protected Boolean              validation;
    // 返回时的回调函数
    protected SofaResponseCallback onReturn;
    // 最大并发执行(不管服务端还是客户端)
    protected Integer              concurrents;
    // 是否启用客户端缓存
    protected Boolean              cache;
    // 是否启动压缩
    protected String               compress;
    /*-------------配置项结束----------------*/
}
上一篇下一篇

猜你喜欢

热点阅读