SOFARPC 源码分析2 - SPI 扩展机制
大部分 RPC 框架都会通过使用 SPI 扩展机制来实现高可扩展性,例如 Dubbo,SOFARPC 等。但是 JDK-SPI 由于其显著的缺点,RPC 框架通常会定制自己的 SPI 框架。
JDK-SPI 这里介绍了 JDK-SPI 的原理和缺点;
Dubbo-SPI 这里介绍了 Dubbo-SPI 的设计和实现。
一、SOFARPC-SPI 的使用
1.1、可扩展接口
package com.alipay.sofa.rpc.client;
@Extensible(singleton = false) // 可扩展接口的标识,并提供一些可选参数
public abstract class LoadBalancer {
...
}
@Extensible
该注解通常添加在接口
或者抽象类
上,用于标识该接口
或者抽象类
是一个可扩展接口或者可扩展的抽象类。
1.2、可扩展接口的实现
package com.alipay.sofa.rpc.zjg;
@Extension("hello") // 可扩展接口实现的标识,并且指定该实现的 alias
public class MyLoadBalancer extends LoadBalancer {
...
}
@Extension
该注解添加在可扩展接口或抽象类的实现上,并且通常会添加一个alias
用于后续的动态加载。
1.3、编写接口实现的配置文件
src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.client.LoadBalancer
hello=com.alipay.sofa.rpc.zjg.MyLoadBalancer
注意:
- 配置文件的文件名是
可扩展接口
的全类名- 配置文件的内容是
alias=具体实现的全类名
- 配置文件的存放位置默认由
rpc-config-default.json
中的"extension.load.path"
来指定,可以通过自定义sofa-rpc/rpc-config.json
文件或者META-INF/sofa-rpc/rpc-config.json
中指定"extension.load.path"
来覆盖,默认配置是"extension.load.path": [ "META-INF/services/sofa-rpc/", "META-INF/services/" ]
1.4、运行时动态选择接口实现
ExtensionLoader<LoadBalancer> loader = ExtensionLoaderFactory.getExtensionLoader(LoadBalancer.class);
LoadBalancer myLoadBalancer = loader.getExtension("hello");
首先获取
可扩展接口
的ExtensionLoader
然后通过ExtensionLoader
加载指定alias
的具体实现
二、SOFARPC-SPI 的设计
image.png
@Extensible
:该注解通常添加在接口
或者抽象类
上,用于标识该接口
或者抽象类
是一个可扩展接口或者可扩展的抽象类@Extension
:该注解添加在可扩展接口或抽象类的实现
上,并且通常会添加一个alias
用于后续的动态加载ExtensionClass
:一个实现类
会最终被其 ExtensionLoader 加载称为一个ExtensionClass
,存储在其
ExtensionLoader 中,并且包含了实例化ExtensionClass
存储的实现类
的方法ExtensionLoader
:每一个可扩展接口
或可扩展抽象类
都有一个ExtensionLoader
,用于从相应接口的 SPI 配置文件中读取配置内容并且将每一行解析成一个ExtensionClass
(每一个ExtensionClass
对应一个实现,SPI 配置文件中的每一行配置一个实现类),之后存储<alias, ExtensionClass>
配置对到Map<String, ExtensionClass<T>>
容器中ExtensionLoaderListener
:当将一个ExtensionClass
成功的添加到 Map 容器中的时候,触发 ExtensionLoaderFactory 创建的ExtensionLoaderListener
监听器ExtensionLoaderFactory
:用来获取或者创建 ExtensionLoader,将创建好的 ExtensionLoader 放置在Map<Class, ExtensionLoader>
容器中
各组件的关系:图片出处
image.png
2.1、@Extensible
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Extensible {
// 指定自定义扩展文件名称,默认就是可扩展接口是可扩展抽象类的全类名
String file() default "";
// 扩展类是否使用单例,默认使用
boolean singleton() default true;
// 扩展类是否需要编码,默认不需要
boolean coded() default false;
}
2.2、@Extension
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Extension {
// 扩展点名字 alias
String value();
// 扩展点编码,默认不需要,当接口需要编码的时候需要
byte code() default -1;
// 优先级排序,默认不需要,大的优先级高
int order() default 0;
// 是否覆盖其它低order的同名扩展
boolean override() default false;
// 排斥其它扩展,可以排斥掉其它低order的扩展
String[] rejection() default {};
}
2.3、ExtensionLoaderFactory
public class ExtensionLoaderFactory {
private ExtensionLoaderFactory() {
}
// key: 可扩展接口或者可扩展抽象类
private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();
// Get extension loader by extensible class with listener
// 1. 从 LOADER_MAP 容器中获取 clazz 对应的 ExtensionLoader
// 2. 如果没有,则使用双重检测同步创建一个 ExtensionLoader
// 3. 将 {clazz, ExtensionLoader} 对存储到 LOADER_MAP 中
// 4. 返回 ExtensionLoader
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
if (loader == null) {
synchronized (ExtensionLoaderFactory.class) {
loader = LOADER_MAP.get(clazz);
if (loader == null) {
loader = new ExtensionLoader<T>(clazz, listener);
LOADER_MAP.put(clazz, loader);
}
}
}
return loader;
}
// Get extension loader by extensible class without listener
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz) {
return getExtensionLoader(clazz, null);
}
}
2.4、ExtensionLoader
// 一个可扩展接口类,对应一个加载器
public class ExtensionLoader<T> {
// SOFARPC 自己编写的 Log 体系
// getLogger 的执行会触发 RpcConfigs 的 static 块,在该 static 块中会做读取默认配置的操作
// 读取路径有以下三个
// 1. rpc-config-default.json(默认的配置文件)
// 2. sofa-rpc/rpc-config.json(自定义)
// 3. META-INF/sofa-rpc/rpc-config.json(自定义)
private final static Logger LOGGER = LoggerFactory.getLogger(ExtensionLoader.class);
// 当前加载的可扩展接口类
protected final Class<T> interfaceClass;
// 当前加载的可扩展接口类名
protected final String interfaceName;
// 扩展点注解
protected final Extensible extensible;
// 全部的加载的实现类 {"alias":ExtensionClass}
protected final ConcurrentMap<String, ExtensionClass<T>> all;
// 如果是单例,那么factory不为空
protected final ConcurrentMap<String, T> factory;
// 加载监听器
protected final ExtensionLoaderListener<T> listener;
public ExtensionLoader(Class<T> interfaceClass, ExtensionLoaderListener<T> listener) {
this(interfaceClass, true, listener);
}
protected ExtensionLoader(Class<T> interfaceClass) {
this(interfaceClass, true, null);
}
/**
* interfaceClass 接口类
* autoLoad 是否自动开始加载,默认开启
* listener 扩展加载监听器
*/
protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
...
// 接口为空,既不是接口,也不是抽象类
if (interfaceClass == null || !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {
throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
}
this.interfaceClass = interfaceClass;
this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
this.listener = listener;
Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
// 如果可扩展接口没有配置 @Extensible 注解,直接抛错
if (extensible == null) {
throw new IllegalArgumentException("Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
} else {
this.extensible = extensible;
}
this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
// 如果开启了自动加载(默认开启),则进行 SPI 配置文件的读取
// 1. 从全局默认配置中获取 SPI 扩展文件的路径
// 2. 循环遍历每一个 SPI 扩展文件,读取配置内容并解析。
if (autoLoad) {
List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
for (String path : paths) {
loadFromFile(path);
}
}
}
// path必须以/结尾
protected synchronized void loadFromFile(String path) {
// 默认如果不指定文件名字,就是接口名
String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim();
String fullFileName = path + file;
ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass());
loadFromClassLoader(classLoader, fullFileName);
}
protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable {
// 获取并加载 SPI 配置文件
Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName) : ClassLoader.getSystemResources(fullFileName);
// 可能存在多个文件
if (urls != null) {
while (urls.hasMoreElements()) {
// 读取一个文件
URL url = urls.nextElement();
BufferedReader reader = null;
reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8"));
String line;
// 每次读取一行,每一行就是一个 SPI 实现类配置
while ((line = reader.readLine()) != null) {
readLine(url, line);
}
}
}
}
protected void readLine(URL url, String line) {
String[] aliasAndClassName = parseAliasAndClassName(line);
if (aliasAndClassName == null || aliasAndClassName.length != 2) {
return;
}
String alias = aliasAndClassName[0];
String className = aliasAndClassName[1];
// 读取配置的实现类
Class tmp = ClassUtils.forName(className, false);
// 如果实现类不是可扩展接口的实现类,抛错
if (!interfaceClass.isAssignableFrom(tmp)) {
throw new IllegalArgumentException(...);
}
Class<? extends T> implClass = (Class<? extends T>) tmp;
// 检查是否有可扩展标识
Extension extension = implClass.getAnnotation(Extension.class);
if (extension == null) {
throw new IllegalArgumentException(...);
} else {
String aliasInCode = extension.value();
// 实现类的 @Extension 注解必须配置 value(), 即 alias,否则抛错
if (StringUtils.isBlank(aliasInCode)) {
throw new IllegalArgumentException(...);
}
if (alias == null) {
// 如果 spi 配置文件里没配置 alias,则直接使用实现类的 @Extension 注解里的 alias
// 所以:spi 配置文件是允许不配置为 alias=XxxImpl 的,可以直接配置为 XxxImpl,此时的 alias 读取的就是 XxxImpl 的 @Extension 注解里的 alias
alias = aliasInCode;
} else {
// spi 配置文件里配置的和代码里的不一致,直接抛错
if (!aliasInCode.equals(alias)) {
throw new IllegalArgumentException(...);
}
}
// 接口需要编号,实现类没设置
if (extensible.coded() && extension.code() < 0) {
throw new IllegalArgumentException(...);
}
}
// alias 不可以配置为 default 和 *
if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) {
throw new IllegalArgumentException(...);
}
// 检查是否有存在同 alias 的 ExtensionClass
// 进行覆盖逻辑
ExtensionClass old = all.get(alias);
ExtensionClass<T> extensionClass = null;
if (old != null) {
// 如果当前扩展可以覆盖其它同名扩展
if (extension.override()) {
// 如果优先级还没有旧的高,则忽略;如果优先级高于旧的
if (extension.order() >= old.getOrder()) {
// 如果当前扩展可以覆盖其它同名扩展
extensionClass = buildClass(extension, implClass, alias);
}
}
// 如果当前扩展不可以覆盖其它同名扩展
else {
if (old.isOverride() || old.getOrder() < extension.order()) {
// 如果不能被覆盖,抛出已存在异常
throw new IllegalStateException("Duplicate class with same alias: " + alias);
}
}
} else {
// 如果不存在同 alias 的 ExtensionClass,则直接新建一个 ExtensionClass
extensionClass = buildClass(extension, implClass, alias);
}
// 做互斥扩展点排除逻辑
if (extensionClass != null) {
// 检查是否有互斥的扩展点
for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet()) {
ExtensionClass existed = entry.getValue();
if (extensionClass.getOrder() >= existed.getOrder()) {
// 新的优先级 >= 老的优先级,检查新的扩展是否排除老的扩展
String[] rejection = extensionClass.getRejection();
if (CommonUtils.isNotEmpty(rejection)) {
for (String rej : rejection) {
existed = all.get(rej);
// 只排除低 order 的扩展点
if (existed == null || extensionClass.getOrder() < existed.getOrder()) {
continue;
}
all.remove(rej);
}
}
} else {
// 如果 新的优先级 < 老的优先级,判断是否需要将新的排除掉
String[] rejection = existed.getRejection();
if (CommonUtils.isNotEmpty(rejection)) {
for (String rej : rejection) {
if (rej.equals(extensionClass.getAlias())) {
// 被其它扩展排掉
return;
}
}
}
}
}
// 执行监听器逻辑,将 {alias : ExtensionClass} 添加到容器中
loadSuccess(alias, extensionClass);
}
}
private ExtensionClass<T> buildClass(Extension extension, Class<? extends T> implClass, String alias) {
ExtensionClass<T> extensionClass = new ExtensionClass<T>(implClass, alias);
extensionClass.setCode(extension.code());
extensionClass.setSingleton(extensible.singleton());
extensionClass.setOrder(extension.order());
extensionClass.setOverride(extension.override());
extensionClass.setRejection(extension.rejection());
return extensionClass;
}
private void loadSuccess(String alias, ExtensionClass<T> extensionClass) {
if (listener != null) {
listener.onLoad(extensionClass); // 加载完毕,通知监听器
all.put(alias, extensionClass);
} else {
all.put(alias, extensionClass);
}
}
public ConcurrentMap<String, ExtensionClass<T>> getAllExtensions() {
return all;
}
// 根据服务别名查找扩展类
public ExtensionClass<T> getExtensionClass(String alias) {
return all == null ? null : all.get(alias);
}
// 获取扩展点实例
public T getExtension(String alias) {
ExtensionClass<T> extensionClass = getExtensionClass(alias);
if (extensionClass == null) {
throw new SofaRpcRuntimeException(...);
} else {
// 如果是单例,获取或者双重检测创建 T
if (extensible.singleton() && factory != null) {
T t = factory.get(alias);
if (t == null) {
synchronized (this) {
t = factory.get(alias);
if (t == null) {
t = extensionClass.getExtInstance();
factory.put(alias, t);
}
}
}
return t;
} else {
// 如果不是单例,直接创建 T
return extensionClass.getExtInstance();
}
}
}
/**
* 得到实例
*
* @param alias 别名
* @param argTypes 扩展初始化需要的参数类型
* @param args 扩展初始化需要的参数
* @return 扩展实例(已判断是否单例)
*/
public T getExtension(String alias, Class[] argTypes, Object[] args) {
ExtensionClass<T> extensionClass = getExtensionClass(alias);
if (extensionClass == null) {
throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: \"" + alias + "\"!");
} else {
if (extensible.singleton() && factory != null) {
T t = factory.get(alias);
if (t == null) {
synchronized (this) {
t = factory.get(alias);
if (t == null) {
t = extensionClass.getExtInstance(argTypes, args);
factory.put(alias, t);
}
}
}
return t;
} else {
return extensionClass.getExtInstance(argTypes, args);
}
}
}
}
2.5、ExtensionLoaderListener
public interface ExtensionLoaderListener<T> {
// 当扩展点成功加载时,触发的事件
void onLoad(ExtensionClass<T> extensionClass);
}
2.6、ExtensionClass
// 扩展接口实现类
public class ExtensionClass<T> implements Sortable {
// 扩展接口实现类
protected final Class<? extends T> clazz;
// 扩展实现类别名
protected final String alias;
// 扩展编码,必须唯一
protected byte code;
// 是否单例
protected boolean singleton;
// 扩展点排序值,大的优先级高
protected int order;
// 是否覆盖其它低order的同名扩展
protected boolean override;
// 排斥其它扩展,可以排斥掉其它低order的扩展
protected String[] rejection;
// 服务端实例对象(只在是单例的时候保留)
private volatile transient T instance;
/**
* 构造函数
* @param clazz 扩展实现类名
* @param alias 扩展别名
*/
public ExtensionClass(Class<? extends T> clazz, String alias) {
this.clazz = clazz;
this.alias = alias;
}
// 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象
public T getExtInstance() {
return getExtInstance(null, null);
}
/**
* 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象(全部都是反射创建)
* @param argTypes 构造函数参数类型
* @param args 构造函数参数值
*/
public T getExtInstance(Class[] argTypes, Object[] args) {
if (clazz != null) {
if (singleton) { // 如果是单例
if (instance == null) {
synchronized (this) {
if (instance == null) {
instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
}
}
}
return instance; // 保留单例
} else {
return ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
}
}
throw new SofaRpcRuntimeException("Class of ExtensionClass is null");
}
}
三、SOFARPC-SPI 流程总结
ExtensionLoader<LoadBalancer> loader = ExtensionLoaderFactory.getExtensionLoader(LoadBalancer.class);
- 从
Map<Class, ExtensionLoader> LOADER_MAP
容器中获取 clazz(可扩展接口或可扩展抽象类
)对应的ExtensionLoader
- 如果没有,则使用双重检测同步创建一个
ExtensionLoader
2.1. 从全局默认配置文件(默认是
rpc-config-default.json
)中获取 SPI 扩展文件的路径
2.2. 循环遍历每一个 SPI 扩展文件,对每一个 SPI 扩展文件做如下操作2.2.1. 读取每一行(每一行是配置一个实现类,每个实现类会被解析成一个
ExtensionClass
)
2.2.2. 获取 alias:如果 SPI 配置文件中没有配置 alias,则使用实现类的@Extension
注解中的的 alias,否则,SPI 配置文件中配置的 alias 必须与实现类的@Extension
注解中的 alias 同名
2.2.3. 检查当前的Map<String, ExtensionClass<T>> all
中是否有同 alias 的 ExtensionClass,如果没有,直接创建当前实现类的 ExtensionClass,否则进行覆盖逻辑(如果当前的实现类可以覆盖且优先级高于旧的,则直接覆盖)
2.2.4. 做互斥扩展点的排除逻辑:循环遍历Map<String, ExtensionClass<T>> all
中的每一个 ExtensionClass,如果当前实现类的 ExtensionClass 的优先级大于等于遍历到的 ExtensionClass,并且当前实现类的 ExtensionClass 的互斥扩展集合包含遍历到的 ExtensionClass,则从 all 映射中删除遍历到的 ExtensionClass;如果当前实现类的 ExtensionClass 的优先级小于遍历到的 ExtensionClass,并且遍历到的 ExtensionClass 的互斥扩展集合包含当前实现类的 ExtensionClass,则直接 return,不再将当前的实现类的 ExtensionClass 放到 all 中(5.5.0-SNAPSHOT 版本中该实现有个问题:https://github.com/alipay/sofa-rpc/issues/367)
2.2.5. 如果配置了 ExtensionLoaderListener,则执行该监听器的监听逻辑,之后将{alias : 当前实现类的 ExtensionClass}
放置到 all 映射中
- 将
{clazz, ExtensionLoader}
对存储到LOADER_MAP
中- 返回
ExtensionLoader
LoadBalancer myLoadBalancer = loader.getExtension("hello");
- 从当期可扩展接口的
ExtensionLoader
的Map<String, ExtensionClass<T>> all
中获取指定 alias 的 ExtensionClass- 如果配置为单例,则获取或使用双重检测反射创建 ExtensionClass 的实例,并将创建好的实例塞入
Map<String, T> factory
,方便下次使用;否则,直接反射创建 ExtensionClass 的实例
加载原理图:图片地址
image.png