分布式

SOFARPC 源码分析2 - SPI 扩展机制

2018-11-18  本文已影响113人  原水寒

大部分 RPC 框架都会通过使用 SPI 扩展机制来实现高可扩展性,例如 Dubbo,SOFARPC 等。但是 JDK-SPI 由于其显著的缺点,RPC 框架通常会定制自己的 SPI 框架。
JDK-SPI 这里介绍了 JDK-SPI 的原理和缺点;
Dubbo-SPI 这里介绍了 Dubbo-SPI 的设计和实现。

一、SOFARPC-SPI 的使用

1.1、可扩展接口

package com.alipay.sofa.rpc.client;

@Extensible(singleton = false) // 可扩展接口的标识,并提供一些可选参数
public abstract class LoadBalancer {
    ...
}

@Extensible 该注解通常添加在 接口 或者 抽象类 上,用于标识该 接口 或者 抽象类 是一个可扩展接口或者可扩展的抽象类。

1.2、可扩展接口的实现

package com.alipay.sofa.rpc.zjg;

@Extension("hello") // 可扩展接口实现的标识,并且指定该实现的 alias
public class MyLoadBalancer extends LoadBalancer {
    ...   
}

@Extension 该注解添加在可扩展接口或抽象类的实现上,并且通常会添加一个 alias 用于后续的动态加载。

1.3、编写接口实现的配置文件

src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.client.LoadBalancer

hello=com.alipay.sofa.rpc.zjg.MyLoadBalancer

注意:

  • 配置文件的文件名是 可扩展接口 的全类名
  • 配置文件的内容是 alias=具体实现的全类名
  • 配置文件的存放位置默认由 rpc-config-default.json 中的 "extension.load.path" 来指定,可以通过自定义 sofa-rpc/rpc-config.json 文件或者 META-INF/sofa-rpc/rpc-config.json 中指定 "extension.load.path" 来覆盖,默认配置是
 "extension.load.path": [
  "META-INF/services/sofa-rpc/",
   "META-INF/services/"
 ]

1.4、运行时动态选择接口实现

ExtensionLoader<LoadBalancer> loader = ExtensionLoaderFactory.getExtensionLoader(LoadBalancer.class);
LoadBalancer myLoadBalancer = loader.getExtension("hello");

首先获取 可扩展接口ExtensionLoader
然后通过 ExtensionLoader 加载指定 alias具体实现

二、SOFARPC-SPI 的设计

image.png
  • @Extensible:该注解通常添加在 接口 或者 抽象类 上,用于标识该 接口 或者 抽象类 是一个可扩展接口或者可扩展的抽象类
  • @Extension:该注解添加在可扩展接口或抽象类的 实现上,并且通常会添加一个 alias 用于后续的动态加载
  • ExtensionClass:一个 实现类会最终被其 ExtensionLoader 加载称为一个 ExtensionClass,存储在其
    ExtensionLoader 中,并且包含了实例化 ExtensionClass 存储的 实现类 的方法
  • ExtensionLoader:每一个 可扩展接口可扩展抽象类 都有一个 ExtensionLoader,用于从相应接口的 SPI 配置文件中读取配置内容并且将每一行解析成一个 ExtensionClass(每一个 ExtensionClass 对应一个实现,SPI 配置文件中的每一行配置一个实现类),之后存储 <alias, ExtensionClass> 配置对到 Map<String, ExtensionClass<T>> 容器中
  • ExtensionLoaderListener:当将一个 ExtensionClass 成功的添加到 Map 容器中的时候,触发 ExtensionLoaderFactory 创建的 ExtensionLoaderListener 监听器
  • ExtensionLoaderFactory:用来获取或者创建 ExtensionLoader,将创建好的 ExtensionLoader 放置在 Map<Class, ExtensionLoader> 容器中

各组件的关系:图片出处

image.png

2.1、@Extensible

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Extensible {

    // 指定自定义扩展文件名称,默认就是可扩展接口是可扩展抽象类的全类名
    String file() default "";

    // 扩展类是否使用单例,默认使用
    boolean singleton() default true;

    // 扩展类是否需要编码,默认不需要
    boolean coded() default false;
}

2.2、@Extension

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Extension {
    // 扩展点名字 alias
    String value();

    // 扩展点编码,默认不需要,当接口需要编码的时候需要
    byte code() default -1;

    // 优先级排序,默认不需要,大的优先级高
    int order() default 0;

    // 是否覆盖其它低order的同名扩展
    boolean override() default false;

    // 排斥其它扩展,可以排斥掉其它低order的扩展
    String[] rejection() default {};
}

2.3、ExtensionLoaderFactory

public class ExtensionLoaderFactory {
    private ExtensionLoaderFactory() {
    }

    // key: 可扩展接口或者可扩展抽象类
    private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();

    // Get extension loader by extensible class with listener
    // 1. 从 LOADER_MAP 容器中获取 clazz 对应的 ExtensionLoader
    // 2. 如果没有,则使用双重检测同步创建一个 ExtensionLoader
    // 3. 将 {clazz, ExtensionLoader} 对存储到 LOADER_MAP 中
    // 4. 返回 ExtensionLoader
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
        ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
        if (loader == null) {
            synchronized (ExtensionLoaderFactory.class) {
                loader = LOADER_MAP.get(clazz);
                if (loader == null) {
                    loader = new ExtensionLoader<T>(clazz, listener);
                    LOADER_MAP.put(clazz, loader);
                }
            }
        }
        return loader;
    }

    // Get extension loader by extensible class without listener
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz) {
        return getExtensionLoader(clazz, null);
    }
}

2.4、ExtensionLoader

// 一个可扩展接口类,对应一个加载器
public class ExtensionLoader<T> {
    // SOFARPC 自己编写的 Log 体系
    // getLogger 的执行会触发 RpcConfigs 的 static 块,在该 static 块中会做读取默认配置的操作
    // 读取路径有以下三个
    // 1. rpc-config-default.json(默认的配置文件)
    // 2. sofa-rpc/rpc-config.json(自定义)
    // 3. META-INF/sofa-rpc/rpc-config.json(自定义)
    private final static Logger LOGGER = LoggerFactory.getLogger(ExtensionLoader.class);
    // 当前加载的可扩展接口类
    protected final Class<T> interfaceClass;
    // 当前加载的可扩展接口类名
    protected final String interfaceName;
    // 扩展点注解
    protected final Extensible extensible;
    // 全部的加载的实现类 {"alias":ExtensionClass}
    protected final ConcurrentMap<String, ExtensionClass<T>> all;
    // 如果是单例,那么factory不为空
    protected final ConcurrentMap<String, T> factory;
    // 加载监听器
    protected final ExtensionLoaderListener<T> listener;

    public ExtensionLoader(Class<T> interfaceClass, ExtensionLoaderListener<T> listener) {
        this(interfaceClass, true, listener);
    }

    protected ExtensionLoader(Class<T> interfaceClass) {
        this(interfaceClass, true, null);
    }

    /**
     * interfaceClass 接口类
     * autoLoad       是否自动开始加载,默认开启
     * listener       扩展加载监听器
     */
    protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
        ...
        // 接口为空,既不是接口,也不是抽象类
        if (interfaceClass == null || !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {
            throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
        }
        this.interfaceClass = interfaceClass;
        this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
        this.listener = listener;
        Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
        // 如果可扩展接口没有配置 @Extensible 注解,直接抛错
        if (extensible == null) {
            throw new IllegalArgumentException("Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
        } else {
            this.extensible = extensible;
        }
        this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
        this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
        // 如果开启了自动加载(默认开启),则进行 SPI 配置文件的读取
        // 1. 从全局默认配置中获取 SPI 扩展文件的路径
        // 2. 循环遍历每一个 SPI 扩展文件,读取配置内容并解析。
        if (autoLoad) {
            List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
            for (String path : paths) {
                loadFromFile(path);
            }
        }
    }

    // path必须以/结尾
    protected synchronized void loadFromFile(String path) {
        // 默认如果不指定文件名字,就是接口名
        String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim();
        String fullFileName = path + file;
        ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass());
        loadFromClassLoader(classLoader, fullFileName);
    }

    protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable {
        // 获取并加载 SPI 配置文件
        Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName) : ClassLoader.getSystemResources(fullFileName);
        // 可能存在多个文件
        if (urls != null) {
            while (urls.hasMoreElements()) {
                // 读取一个文件
                URL url = urls.nextElement();
                BufferedReader reader = null;
                reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8"));
                String line;
                // 每次读取一行,每一行就是一个 SPI 实现类配置
                while ((line = reader.readLine()) != null) {
                    readLine(url, line);
                }
            }
        }
    }

    protected void readLine(URL url, String line) {
        String[] aliasAndClassName = parseAliasAndClassName(line);
        if (aliasAndClassName == null || aliasAndClassName.length != 2) {
            return;
        }
        String alias = aliasAndClassName[0];
        String className = aliasAndClassName[1];
        // 读取配置的实现类
        Class tmp = ClassUtils.forName(className, false);
        // 如果实现类不是可扩展接口的实现类,抛错
        if (!interfaceClass.isAssignableFrom(tmp)) {
            throw new IllegalArgumentException(...);
        }
        Class<? extends T> implClass = (Class<? extends T>) tmp;

        // 检查是否有可扩展标识
        Extension extension = implClass.getAnnotation(Extension.class);
        if (extension == null) {
            throw new IllegalArgumentException(...);
        } else {
            String aliasInCode = extension.value();
            // 实现类的 @Extension 注解必须配置 value(), 即 alias,否则抛错
            if (StringUtils.isBlank(aliasInCode)) {
                throw new IllegalArgumentException(...);
            }
            if (alias == null) {
                // 如果 spi 配置文件里没配置 alias,则直接使用实现类的 @Extension 注解里的 alias
                // 所以:spi 配置文件是允许不配置为 alias=XxxImpl 的,可以直接配置为 XxxImpl,此时的 alias 读取的就是 XxxImpl 的 @Extension 注解里的 alias
                alias = aliasInCode;
            } else {
                // spi 配置文件里配置的和代码里的不一致,直接抛错
                if (!aliasInCode.equals(alias)) {
                    throw new IllegalArgumentException(...);
                }
            }
            // 接口需要编号,实现类没设置
            if (extensible.coded() && extension.code() < 0) {
                throw new IllegalArgumentException(...);
            }
        }
        // alias 不可以配置为 default 和 *
        if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) {
            throw new IllegalArgumentException(...);
        }
        // 检查是否有存在同 alias 的 ExtensionClass
        // 进行覆盖逻辑
        ExtensionClass old = all.get(alias);
        ExtensionClass<T> extensionClass = null;
        if (old != null) {
            // 如果当前扩展可以覆盖其它同名扩展
            if (extension.override()) {
                // 如果优先级还没有旧的高,则忽略;如果优先级高于旧的
                if (extension.order() >= old.getOrder()) {
                    // 如果当前扩展可以覆盖其它同名扩展
                    extensionClass = buildClass(extension, implClass, alias);
                }
            }
            // 如果当前扩展不可以覆盖其它同名扩展
            else {
                if (old.isOverride() || old.getOrder() < extension.order()) {
                    // 如果不能被覆盖,抛出已存在异常
                    throw new IllegalStateException("Duplicate class with same alias: " + alias);
                }
            }
        } else {
            // 如果不存在同 alias 的 ExtensionClass,则直接新建一个 ExtensionClass
            extensionClass = buildClass(extension, implClass, alias);
        }

        // 做互斥扩展点排除逻辑
        if (extensionClass != null) {
            // 检查是否有互斥的扩展点
            for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet()) {
                ExtensionClass existed = entry.getValue();
                if (extensionClass.getOrder() >= existed.getOrder()) {
                    // 新的优先级 >= 老的优先级,检查新的扩展是否排除老的扩展
                    String[] rejection = extensionClass.getRejection();
                    if (CommonUtils.isNotEmpty(rejection)) {
                        for (String rej : rejection) {
                            existed = all.get(rej);
                            // 只排除低 order 的扩展点
                            if (existed == null || extensionClass.getOrder() < existed.getOrder()) {
                                continue;
                            }
                            all.remove(rej);
                        }
                    }
                } else {
                    // 如果 新的优先级 < 老的优先级,判断是否需要将新的排除掉
                    String[] rejection = existed.getRejection();
                    if (CommonUtils.isNotEmpty(rejection)) {
                        for (String rej : rejection) {
                            if (rej.equals(extensionClass.getAlias())) {
                                // 被其它扩展排掉
                                return;
                            }
                        }
                    }
                }
            }

            // 执行监听器逻辑,将 {alias : ExtensionClass} 添加到容器中
            loadSuccess(alias, extensionClass);
        }
    }

    private ExtensionClass<T> buildClass(Extension extension, Class<? extends T> implClass, String alias) {
        ExtensionClass<T> extensionClass = new ExtensionClass<T>(implClass, alias);
        extensionClass.setCode(extension.code());
        extensionClass.setSingleton(extensible.singleton());
        extensionClass.setOrder(extension.order());
        extensionClass.setOverride(extension.override());
        extensionClass.setRejection(extension.rejection());
        return extensionClass;
    }

    private void loadSuccess(String alias, ExtensionClass<T> extensionClass) {
        if (listener != null) {
            listener.onLoad(extensionClass); // 加载完毕,通知监听器
            all.put(alias, extensionClass);
        } else {
            all.put(alias, extensionClass);
        }
    }

    public ConcurrentMap<String, ExtensionClass<T>> getAllExtensions() {
        return all;
    }

    // 根据服务别名查找扩展类
    public ExtensionClass<T> getExtensionClass(String alias) {
        return all == null ? null : all.get(alias);
    }

    // 获取扩展点实例
    public T getExtension(String alias) {
        ExtensionClass<T> extensionClass = getExtensionClass(alias);
        if (extensionClass == null) {
            throw new SofaRpcRuntimeException(...);
        } else {
            // 如果是单例,获取或者双重检测创建 T
            if (extensible.singleton() && factory != null) {
                T t = factory.get(alias);
                if (t == null) {
                    synchronized (this) {
                        t = factory.get(alias);
                        if (t == null) {
                            t = extensionClass.getExtInstance();
                            factory.put(alias, t);
                        }
                    }
                }
                return t;
            } else {
                // 如果不是单例,直接创建 T
                return extensionClass.getExtInstance();
            }
        }
    }

    /**
     * 得到实例
     *
     * @param alias    别名
     * @param argTypes 扩展初始化需要的参数类型
     * @param args     扩展初始化需要的参数
     * @return 扩展实例(已判断是否单例)
     */
    public T getExtension(String alias, Class[] argTypes, Object[] args) {
        ExtensionClass<T> extensionClass = getExtensionClass(alias);
        if (extensionClass == null) {
            throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: \"" + alias + "\"!");
        } else {
            if (extensible.singleton() && factory != null) {
                T t = factory.get(alias);
                if (t == null) {
                    synchronized (this) {
                        t = factory.get(alias);
                        if (t == null) {
                            t = extensionClass.getExtInstance(argTypes, args);
                            factory.put(alias, t);
                        }
                    }
                }
                return t;
            } else {
                return extensionClass.getExtInstance(argTypes, args);
            }
        }
    }
}

2.5、ExtensionLoaderListener

public interface ExtensionLoaderListener<T> {
    // 当扩展点成功加载时,触发的事件
    void onLoad(ExtensionClass<T> extensionClass);
}

2.6、ExtensionClass

// 扩展接口实现类
public class ExtensionClass<T> implements Sortable {
    // 扩展接口实现类
    protected final Class<? extends T> clazz;
    // 扩展实现类别名
    protected final String alias;
    // 扩展编码,必须唯一
    protected byte code;
    // 是否单例
    protected boolean singleton;
    // 扩展点排序值,大的优先级高
    protected int order;
    // 是否覆盖其它低order的同名扩展
    protected boolean override;
    // 排斥其它扩展,可以排斥掉其它低order的扩展
    protected String[] rejection;
    // 服务端实例对象(只在是单例的时候保留)
    private volatile transient T instance;

    /**
     * 构造函数
     * @param clazz 扩展实现类名
     * @param alias 扩展别名
     */
    public ExtensionClass(Class<? extends T> clazz, String alias) {
        this.clazz = clazz;
        this.alias = alias;
    }

    // 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象
    public T getExtInstance() {
        return getExtInstance(null, null);
    }

    /**
     * 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象(全部都是反射创建)
     * @param argTypes 构造函数参数类型
     * @param args     构造函数参数值
     */
    public T getExtInstance(Class[] argTypes, Object[] args) {
        if (clazz != null) {
            if (singleton) { // 如果是单例
                if (instance == null) {
                    synchronized (this) {
                        if (instance == null) {
                            instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
                        }
                    }
                }
                return instance; // 保留单例
            } else {
                return ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
            }
        }
        throw new SofaRpcRuntimeException("Class of ExtensionClass is null");
    }
}

三、SOFARPC-SPI 流程总结

ExtensionLoader<LoadBalancer> loader = ExtensionLoaderFactory.getExtensionLoader(LoadBalancer.class);
  1. Map<Class, ExtensionLoader> LOADER_MAP 容器中获取 clazz(可扩展接口或可扩展抽象类)对应的 ExtensionLoader
  2. 如果没有,则使用双重检测同步创建一个 ExtensionLoader

2.1. 从全局默认配置文件(默认是 rpc-config-default.json)中获取 SPI 扩展文件的路径
2.2. 循环遍历每一个 SPI 扩展文件,对每一个 SPI 扩展文件做如下操作

2.2.1. 读取每一行(每一行是配置一个实现类,每个实现类会被解析成一个 ExtensionClass
2.2.2. 获取 alias:如果 SPI 配置文件中没有配置 alias,则使用实现类的 @Extension 注解中的的 alias,否则,SPI 配置文件中配置的 alias 必须与实现类的 @Extension 注解中的 alias 同名
2.2.3. 检查当前的 Map<String, ExtensionClass<T>> all 中是否有同 alias 的 ExtensionClass,如果没有,直接创建当前实现类的 ExtensionClass,否则进行覆盖逻辑(如果当前的实现类可以覆盖且优先级高于旧的,则直接覆盖)
2.2.4. 做互斥扩展点的排除逻辑:循环遍历 Map<String, ExtensionClass<T>> all 中的每一个 ExtensionClass,如果当前实现类的 ExtensionClass 的优先级大于等于遍历到的 ExtensionClass,并且当前实现类的 ExtensionClass 的互斥扩展集合包含遍历到的 ExtensionClass,则从 all 映射中删除遍历到的 ExtensionClass;如果当前实现类的 ExtensionClass 的优先级小于遍历到的 ExtensionClass,并且遍历到的 ExtensionClass 的互斥扩展集合包含当前实现类的 ExtensionClass,则直接 return,不再将当前的实现类的 ExtensionClass 放到 all 中(5.5.0-SNAPSHOT 版本中该实现有个问题:https://github.com/alipay/sofa-rpc/issues/367
2.2.5. 如果配置了 ExtensionLoaderListener,则执行该监听器的监听逻辑,之后将 {alias : 当前实现类的 ExtensionClass} 放置到 all 映射中

  1. {clazz, ExtensionLoader} 对存储到 LOADER_MAP
  2. 返回 ExtensionLoader
LoadBalancer myLoadBalancer = loader.getExtension("hello");
  1. 从当期可扩展接口的 ExtensionLoaderMap<String, ExtensionClass<T>> all 中获取指定 alias 的 ExtensionClass
  2. 如果配置为单例,则获取或使用双重检测反射创建 ExtensionClass 的实例,并将创建好的实例塞入 Map<String, T> factory,方便下次使用;否则,直接反射创建 ExtensionClass 的实例

加载原理图:图片地址

image.png
上一篇下一篇

猜你喜欢

热点阅读