Dubbo源码剖析之SPI机制<一>
前言
其实网上关于Dubbo SPI机制的文章已经数不胜数了,为何笔者还要写这篇文章呢?我想说大概有以下三个原因。
- 加深一下理解
因为要完全读懂Dubbo的源码,就必须对Dubbo的SPI机制非常熟悉,不然会发现里面的调用关系理不顺,很容易坚持不下去。 - 最近准备在面阿里。
复习一下Dubbo. - 独特见解来剖析SPI机制
可能会让你们更加容易抓住重点,不仅仅只是停留在代码层面的解析。
SPI设计之初
1.为何要有SPI
Dubbo在设计阶段时考虑的就不仅仅只是一个面向内部的RPC框架,而是想在整个分布式服务领域提供解决方案的,甚至想在国外的RPC领域站稳脚跟。基于这种思想,就必须尽可能少的依赖第三方。我们都知道,Spring早已非常流行,如果Dubbo直接使用Spring来做解耦,得省事得多,但是他们并没有那么做,所以就有了Dubbo自己的SPI,它的核心其实就是一个工厂,根据指定的类型来获取实例。古老的简单工厂模式都是通过if else来根据不同的类型来返回不同的实例,但是Dubbo并没有硬编码,而且结合JDK底层的ServiceLoader机制来实现的,配置文件的修改对调用方不可见,这就是一种解耦。
2.ExtentionLoader和ServiceLoader的区别
既然有了JDK自带的ServiceLoader,那为何还要自己实现一套了,大概有以下几个原因。
- JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源;
- 如果扩展点加载失败,连扩展点的名称都拿不到了
- ServiceLoader不够灵活。
ExtensionLoader源码分析
在分析之前,我先罗列一下ExtensionLoader涉及到的一些注解、类。
- ExtensionFactory
@SPI
public interface ExtensionFactory {
/**
* Get extension.
*
* @param type object type.
* @param name object name.
* @return object instance.
*/
<T> T getExtension(Class<T> type, String name);
}
可知,ExtensionFactory是获取扩展的工厂,根据指定的Class类型和名字,即可获取对应的实例。
- @Activate
/**
* Activate
* 对于可以被框架中自动激活加载扩展,此Annotation用于配置扩展被自动激活加载条件。
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* Group过滤条件
*/
String[] group() default {};
/**
* Key过滤条件
*/
String[] value() default {};
/**
* 排序信息,可以不提供。
*/
String[] before() default {};
/**
* 排序信息,可以不提供。
*/
String[] after() default {};
/**
* 排序信息,可以不提供。
*/
int order() default 0;
}
先不做多解释,从字面意思上来看,应该是获取相关激活的扩展(getActivateExtension)
- @Adaptive
/**
* 在{@link ExtensionLoader}生成Extension的Adaptive Instance时,为{@link ExtensionLoader}提供信息。
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
/**
* 从{@link URL}的Key名,对应的Value作为要Adapt成的Extension名。
*/
String[] value() default {};
}
其实就是获取对应的适配类,只需要在类上、方法上、参数中指定即可。
- @Extension
/**
* 扩展点接口的标识。
*/
@Deprecated
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Extension {
/**
* @deprecated
*/
@Deprecated
String value() default "";
}
Extension("mina")加载失败,当用户配置使用mina时,就会报找不到扩展点,
而不是报加载扩展点失败,以及失败原因。该方法已设置成@Deprecated
- @SPI
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
/**
* 缺省扩展点名。
*/
String value() default "";
}
扩展点接口的标识,如果要利用Dubbo的SPI机制,接口必须要加@SPI。
ExtensionLoader的初始化过程
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if(!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
//如果接口没有@SPI,则抛出异常
if(!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
private static <T> boolean withExtensionAnnotation(Class<T> type) {
return type.isAnnotationPresent(SPI.class);
}
ExtensionLoader.getExtensionLoader()
被调用的时候才被初始化,初始化之前会对type进行非空校验、接口限定、type必须加了@SPI
注解.然后从EXTENSION_LOADERS
判断是否已经加载过,如果已经加载过,则直接返回,否则直接创建一个ExtensionLoader(type)
对象,然后返回。
我们先来看一下ExtensionLoader的构造函数:
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
构造函数是私有的,很明显ExtensionLoader是一个单例,一个type唯一对应一个ExtensionLoader.这里很明显type != ExtensionFactory.class
,所以会调用ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension())
该方法。
回到上面,当ExtensionFactory被传入时,完成非空、是否有SPI注解判断后,最后又回到了它私有的构造函数。这次type == ExtensionFactory.class
成立,objectFactory被设置成null值,也就是说当type为ExtensionFactory的时候,它的objectFactory为空.那当type!=ExtensionFactory
的时候,objectFactory是什么呢?带着这个疑问继续看getAdaptiveExtension()
方法。
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
//Holder<Object> cachedAdaptiveInstance,存放对象的容器
//第一次进来,instance肯定为空
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if(createAdaptiveInstanceError == null) {
//这里用了双重加锁,因为ExtensionLoader是单例,并发操作时同时调用getAdaptiveExtension()可能导致并发问题。
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
//初始化操作
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
}
else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
跟进去createAdaptiveExtension
方法:
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
}
}
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
//如果资源文件中没有Adaptive类,则直接创建一个
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
重点看一下getExtensionClasses()
和createAdaptiveExtensionClass()
:
getExtensionClasses()的大体流程就是:
getExtensionClasses()->loadExtensionClasses(),然后将loadExtensionClasses()的结果设置到Holder<Map<String, Class<?>>> cachedClasses
中。
看一下loadExtensionClasses()
:
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if(defaultAnnotation != null) {
String value = defaultAnnotation.value();
if(value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if(names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if(names.length == 1) cachedDefaultName = names[0];
}
}
////extensionClasses中存储的是name-class,例如:dubbo=DubboProtocol
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);//META-INF/dubbo/internal
loadFile(extensionClasses, DUBBO_DIRECTORY);//META-INF/dubbo/
loadFile(extensionClasses, SERVICES_DIRECTORY);//META-INF/services
return extensionClasses;
}
这就是加载资源文件的核心方法,前面的if()
方法主要根据获取到接口上的@SPI
注解来判断value是否为空,如果不为空,则将其用逗号分隔的方式解析出来(一个或者多个,多个时以逗号分隔).如果发现被设置成多个值就抛出异常,如果只有一个,则将该值设置到cachedDefaultName
中缓存起来。下面开始从文件中加载扩展,具体见loadFile
,也是ExtensionLoader的核心方法。
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
//fileName是/META-INF/*/全限定接口名,例如://META-INF/dubbo/com.xx.xx.Filter
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
//获取当前ExtensionLoader类的加载器进行加载,如果加载不到,就使用ClassLoader原生的getSystemResources方式加载
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
//存在资源文件
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL url = urls.nextElement();
try {
//开始一行一行读取文件
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
try {
String line = null;
while ((line = reader.readLine()) != null) {
//过滤行尾注释,例如:impl1=com.alibaba.dubbo.common.extensionloader.ext1.impl.SimpleExtImpl1#Hello World
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
//以=分隔解析key-value
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();//扩展名
line = line.substring(i + 1).trim();//实现类
}
if (line.length() > 0) {
//通过反射获取实现类,如果实例化失败,直接抛出异常
Class<?> clazz = Class.forName(line, true, classLoader);
//如果实例化出来的类不是传入type的实例,则直接抛出异常
if (! type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
//如果实例化出来的类加了@Adaptive,则直接将该类设置到cachedAdaptiveClass成员变量里进行缓存
if (clazz.isAnnotationPresent(Adaptive.class)) {
if(cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
//第二次解析时发现之前缓存的cachedAdaptiveClass和刚刚实例化的clazz不相同,则抛出异常(不止一个Adaptive实现)
} else if (! cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
//针对没有加@Adaptive的情况
} else {
try {
//如果刚刚用反射实例化的类持有一个有参构造函数,且构造函数的类型为传入的type类型
//如果type为Filter,则该实例化类为:xxxFilter(Filter filter)格式; 如果报错,则进入catch
clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
//wrappers缓存的是用传入类型作为参数的有参构造函数
wrappers.add(clazz);
} catch (NoSuchMethodException e) {
clazz.getConstructor();
//针对/META-INF下内容不为 key=value的情况,name为空
if (name == null || name.length() == 0) {
//获取@Extension中的value值
name = findAnnotationName(clazz);
if (name == null || name.length() == 0) {
if (clazz.getSimpleName().length() > type.getSimpleName().length()
&& clazz.getSimpleName().endsWith(type.getSimpleName())) {
name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
} else {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
}
}
}
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
//获取实例化类上的@Activate,如果不为空,则缓存到cachedActivates中
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
//将clazz-name存入cachedNames
for (String n : names) {
if (! cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
//存入入参extensionClasses中返回
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + url + ") in " + url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}
大体实现逻辑:从指定目录下加载资源文件,然后用io流一行一行的读取出来,读取到的行中如果有#
字符,则进行过滤。然后利用将=
"字符右边"的类实例化出来,如果该类有@Adaptive
注解,则直接缓存到cachedAdaptiveClass
成员变量(注意:一个SPI接口,只能具有一个Adaptive实现,否则会抛出异常).如果实例出的类没有@Adaptive
注解,则判断该类是否持有一个用SPI作为参数的有参构造,如果有则把该类添加到 cachedWrapperClasses
中,否则就开始解析类上的Activate
注解,把它保存到cachedActivates
中。最后填充成员变量cachedNames
(class-name),将name-class保存到入参extensionClasses
中返回。整个流程结束!
注意: cachedWrapperClasses类的格式,例如SPI接口Protocol,则Wrapper类格式为:
//SPI接口Protocl作为构造函数的入参传入
public XxxProtocl(Protocol protocol){
}
再回到loadExtensionClasses
方法,将上一步中的extensionClasses
进行了返回。
回到getAdaptiveExtensionClass
方法,如果资源文件中不存在adaptive适配类,则直接拼装.java,然后编译出一个Class文件.
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
//如果资源文件下没有Adaptive类,则直接创建一个
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
private Class<?> createAdaptiveExtensionClass() {
//自己拼装的java类
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
//编译.java文件
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
这里就不展开了,自己查看即可。直接贴出Protocol$Adpative 的代码:
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
再回到createAdaptiveExtension
方法.
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
}
}
(T) getAdaptiveExtensionClass().newInstance()
是Adaptive适配类的实例,那injectExtension
又是负责处理什么的?我们再进入injectExtension
方法。
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
大体流程就是获取Adaptive类的所以方法,然后进行循环。如果方法名以set
开头,并且方法的类型是public、参数个数是1时,通过setXXX
方法,将objectFactory创建的对象设置到对应的Adaptive类中。
总结
今天大致介绍了一下SPI的机制和一些基本概念,对ExtensionLoader的Adaptive机制和loadFile进行了详细说明,其它内容准备在第二小节里详细说明,敬请期待~