Java 杂谈Java高级交流Java高级进阶

Dubbo源码剖析之SPI机制<一>

2018-07-13  本文已影响20人  jerrik

前言

其实网上关于Dubbo SPI机制的文章已经数不胜数了,为何笔者还要写这篇文章呢?我想说大概有以下三个原因。

  1. 加深一下理解
    因为要完全读懂Dubbo的源码,就必须对Dubbo的SPI机制非常熟悉,不然会发现里面的调用关系理不顺,很容易坚持不下去。
  2. 最近准备在面阿里。
    复习一下Dubbo.
  3. 独特见解来剖析SPI机制
    可能会让你们更加容易抓住重点,不仅仅只是停留在代码层面的解析。

SPI设计之初

1.为何要有SPI

Dubbo在设计阶段时考虑的就不仅仅只是一个面向内部的RPC框架,而是想在整个分布式服务领域提供解决方案的,甚至想在国外的RPC领域站稳脚跟。基于这种思想,就必须尽可能少的依赖第三方。我们都知道,Spring早已非常流行,如果Dubbo直接使用Spring来做解耦,得省事得多,但是他们并没有那么做,所以就有了Dubbo自己的SPI,它的核心其实就是一个工厂,根据指定的类型来获取实例。古老的简单工厂模式都是通过if else来根据不同的类型来返回不同的实例,但是Dubbo并没有硬编码,而且结合JDK底层的ServiceLoader机制来实现的,配置文件的修改对调用方不可见,这就是一种解耦。

2.ExtentionLoader和ServiceLoader的区别

既然有了JDK自带的ServiceLoader,那为何还要自己实现一套了,大概有以下几个原因。

  1. JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源;
  2. 如果扩展点加载失败,连扩展点的名称都拿不到了
  3. ServiceLoader不够灵活。

ExtensionLoader源码分析

在分析之前,我先罗列一下ExtensionLoader涉及到的一些注解、类。

@SPI
public interface ExtensionFactory {

    /**
     * Get extension.
     * 
     * @param type object type.
     * @param name object name.
     * @return object instance.
     */
    <T> T getExtension(Class<T> type, String name);
}

可知,ExtensionFactory是获取扩展的工厂,根据指定的Class类型和名字,即可获取对应的实例。

/**
 * Activate
 * 对于可以被框架中自动激活加载扩展,此Annotation用于配置扩展被自动激活加载条件。
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
    /**
     * Group过滤条件
     */
    String[] group() default {};

    /**
     * Key过滤条件
     */
    String[] value() default {};

    /**
     * 排序信息,可以不提供。
     */
    String[] before() default {};

    /**
     * 排序信息,可以不提供。
     */
    String[] after() default {};

    /**
     * 排序信息,可以不提供。
     */
    int order() default 0;
}

先不做多解释,从字面意思上来看,应该是获取相关激活的扩展(getActivateExtension)

/**
 * 在{@link ExtensionLoader}生成Extension的Adaptive Instance时,为{@link ExtensionLoader}提供信息。
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
    
    /**
     * 从{@link URL}的Key名,对应的Value作为要Adapt成的Extension名。
     */
    String[] value() default {};
}

其实就是获取对应的适配类,只需要在类上、方法上、参数中指定即可。

/**
 * 扩展点接口的标识。
 */
@Deprecated
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Extension {

    /**
     * @deprecated
     */
    @Deprecated
    String value() default "";
}

Extension("mina")加载失败,当用户配置使用mina时,就会报找不到扩展点,
而不是报加载扩展点失败,以及失败原因。该方法已设置成@Deprecated

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {

    /**
     * 缺省扩展点名。
     */
    String value() default "";

}

扩展点接口的标识,如果要利用Dubbo的SPI机制,接口必须要加@SPI。

ExtensionLoader的初始化过程

  @SuppressWarnings("unchecked")
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
        if(!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        //如果接口没有@SPI,则抛出异常
        if(!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type + 
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }
        
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }

    private static <T> boolean withExtensionAnnotation(Class<T> type) {
        return type.isAnnotationPresent(SPI.class);
    }

ExtensionLoader.getExtensionLoader()被调用的时候才被初始化,初始化之前会对type进行非空校验、接口限定、type必须加了@SPI注解.然后从EXTENSION_LOADERS判断是否已经加载过,如果已经加载过,则直接返回,否则直接创建一个ExtensionLoader(type)对象,然后返回。
我们先来看一下ExtensionLoader的构造函数:

 private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

构造函数是私有的,很明显ExtensionLoader是一个单例,一个type唯一对应一个ExtensionLoader.这里很明显type != ExtensionFactory.class,所以会调用ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension())该方法。
回到上面,当ExtensionFactory被传入时,完成非空、是否有SPI注解判断后,最后又回到了它私有的构造函数。这次type == ExtensionFactory.class成立,objectFactory被设置成null值,也就是说当type为ExtensionFactory的时候,它的objectFactory为空.那当type!=ExtensionFactory的时候,objectFactory是什么呢?带着这个疑问继续看getAdaptiveExtension()方法。

  @SuppressWarnings("unchecked")
    public T getAdaptiveExtension() {
        //Holder<Object> cachedAdaptiveInstance,存放对象的容器
        //第一次进来,instance肯定为空
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if(createAdaptiveInstanceError == null) {
                //这里用了双重加锁,因为ExtensionLoader是单例,并发操作时同时调用getAdaptiveExtension()可能导致并发问题。
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            //初始化操作
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            }
            else {
                throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
            }
        }

        return (T) instance;
    }

跟进去createAdaptiveExtension方法:

private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
        }
    }

private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        //如果资源文件中没有Adaptive类,则直接创建一个
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
 }

重点看一下getExtensionClasses()createAdaptiveExtensionClass():
getExtensionClasses()的大体流程就是:
getExtensionClasses()->loadExtensionClasses(),然后将loadExtensionClasses()的结果设置到Holder<Map<String, Class<?>>> cachedClasses中。
看一下loadExtensionClasses():

  private Map<String, Class<?>> loadExtensionClasses() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if(defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if(value != null && (value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                if(names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                            + ": " + Arrays.toString(names));
                }
                if(names.length == 1) cachedDefaultName = names[0];
            }
        }
        
        ////extensionClasses中存储的是name-class,例如:dubbo=DubboProtocol
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);//META-INF/dubbo/internal
        loadFile(extensionClasses, DUBBO_DIRECTORY);//META-INF/dubbo/
        loadFile(extensionClasses, SERVICES_DIRECTORY);//META-INF/services
        return extensionClasses;
    }

这就是加载资源文件的核心方法,前面的if()方法主要根据获取到接口上的@SPI注解来判断value是否为空,如果不为空,则将其用逗号分隔的方式解析出来(一个或者多个,多个时以逗号分隔).如果发现被设置成多个值就抛出异常,如果只有一个,则将该值设置到cachedDefaultName中缓存起来。下面开始从文件中加载扩展,具体见loadFile,也是ExtensionLoader的核心方法。

private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        //fileName是/META-INF/*/全限定接口名,例如://META-INF/dubbo/com.xx.xx.Filter
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            //获取当前ExtensionLoader类的加载器进行加载,如果加载不到,就使用ClassLoader原生的getSystemResources方式加载
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }

            //存在资源文件
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL url = urls.nextElement();
                    try {
                        //开始一行一行读取文件
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                        try {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                //过滤行尾注释,例如:impl1=com.alibaba.dubbo.common.extensionloader.ext1.impl.SimpleExtImpl1#Hello World
                                final int ci = line.indexOf('#');
                                if (ci >= 0) line = line.substring(0, ci);
                                line = line.trim();
                                if (line.length() > 0) {
                                    try {
                                        //以=分隔解析key-value
                                        String name = null;
                                        int i = line.indexOf('=');
                                        if (i > 0) {
                                            name = line.substring(0, i).trim();//扩展名
                                            line = line.substring(i + 1).trim();//实现类
                                        }
                                        if (line.length() > 0) {
                                            //通过反射获取实现类,如果实例化失败,直接抛出异常
                                            Class<?> clazz = Class.forName(line, true, classLoader);

                                            //如果实例化出来的类不是传入type的实例,则直接抛出异常
                                            if (! type.isAssignableFrom(clazz)) {
                                                throw new IllegalStateException("Error when load extension class(interface: " +
                                                        type + ", class line: " + clazz.getName() + "), class " 
                                                        + clazz.getName() + "is not subtype of interface.");
                                            }

                                            //如果实例化出来的类加了@Adaptive,则直接将该类设置到cachedAdaptiveClass成员变量里进行缓存
                                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                if(cachedAdaptiveClass == null) {
                                                    cachedAdaptiveClass = clazz;
                                                    //第二次解析时发现之前缓存的cachedAdaptiveClass和刚刚实例化的clazz不相同,则抛出异常(不止一个Adaptive实现)
                                                } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                                            + cachedAdaptiveClass.getClass().getName()
                                                            + ", " + clazz.getClass().getName());
                                                }
                                                //针对没有加@Adaptive的情况
                                            } else {
                                                try {
                                                    //如果刚刚用反射实例化的类持有一个有参构造函数,且构造函数的类型为传入的type类型
                                                    //如果type为Filter,则该实例化类为:xxxFilter(Filter filter)格式; 如果报错,则进入catch
                                                    clazz.getConstructor(type);
                                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                                    if (wrappers == null) {
                                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                        wrappers = cachedWrapperClasses;
                                                    }

                                                    //wrappers缓存的是用传入类型作为参数的有参构造函数
                                                    wrappers.add(clazz);
                                                } catch (NoSuchMethodException e) {
                                                    clazz.getConstructor();
                                                    //针对/META-INF下内容不为 key=value的情况,name为空
                                                    if (name == null || name.length() == 0) {
                                                        //获取@Extension中的value值
                                                        name = findAnnotationName(clazz);
                                                        if (name == null || name.length() == 0) {
                                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                            } else {
                                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                            }
                                                        }
                                                    }
                                                    String[] names = NAME_SEPARATOR.split(name);
                                                    if (names != null && names.length > 0) {
                                                        //获取实例化类上的@Activate,如果不为空,则缓存到cachedActivates中
                                                        Activate activate = clazz.getAnnotation(Activate.class);
                                                        if (activate != null) {
                                                            cachedActivates.put(names[0], activate);
                                                        }

                                                        //将clazz-name存入cachedNames
                                                        for (String n : names) {
                                                            if (! cachedNames.containsKey(clazz)) {
                                                                cachedNames.put(clazz, n);
                                                            }

                                                            //存入入参extensionClasses中返回
                                                            Class<?> c = extensionClasses.get(n);
                                                            if (c == null) {
                                                                extensionClasses.put(n, clazz);
                                                            } else if (c != clazz) {
                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    } catch (Throwable t) {
                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                        exceptions.put(line, e);
                                    }
                                }
                            } // end of while read lines
                        } finally {
                            reader.close();
                        }
                    } catch (Throwable t) {
                        logger.error("Exception when load extension class(interface: " +
                                            type + ", class file: " + url + ") in " + url, t);
                    }
                } // end of while urls
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }

大体实现逻辑:从指定目录下加载资源文件,然后用io流一行一行的读取出来,读取到的行中如果有#字符,则进行过滤。然后利用将="字符右边"的类实例化出来,如果该类有@Adaptive注解,则直接缓存到cachedAdaptiveClass成员变量(注意:一个SPI接口,只能具有一个Adaptive实现,否则会抛出异常).如果实例出的类没有@Adaptive注解,则判断该类是否持有一个用SPI作为参数的有参构造,如果有则把该类添加到 cachedWrapperClasses中,否则就开始解析类上的Activate注解,把它保存到cachedActivates中。最后填充成员变量cachedNames(class-name),将name-class保存到入参extensionClasses中返回。整个流程结束!
注意: cachedWrapperClasses类的格式,例如SPI接口Protocol,则Wrapper类格式为:

  //SPI接口Protocl作为构造函数的入参传入
  public XxxProtocl(Protocol protocol){

  }

再回到loadExtensionClasses方法,将上一步中的extensionClasses进行了返回。
回到getAdaptiveExtensionClass方法,如果资源文件中不存在adaptive适配类,则直接拼装.java,然后编译出一个Class文件.

private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        //如果资源文件下没有Adaptive类,则直接创建一个
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }

private Class<?> createAdaptiveExtensionClass() {
        //自己拼装的java类
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        //编译.java文件
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

这里就不展开了,自己查看即可。直接贴出Protocol$Adpative 的代码:

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

再回到createAdaptiveExtension方法.

  private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
        }
    }

(T) getAdaptiveExtensionClass().newInstance()是Adaptive适配类的实例,那injectExtension又是负责处理什么的?我们再进入injectExtension方法。

private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                for (Method method : instance.getClass().getMethods()) {
                    if (method.getName().startsWith("set")
                            && method.getParameterTypes().length == 1
                            && Modifier.isPublic(method.getModifiers())) {
                        Class<?> pt = method.getParameterTypes()[0];
                        try {
                            String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("fail to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }

大体流程就是获取Adaptive类的所以方法,然后进行循环。如果方法名以set开头,并且方法的类型是public、参数个数是1时,通过setXXX方法,将objectFactory创建的对象设置到对应的Adaptive类中。

总结

今天大致介绍了一下SPI的机制和一些基本概念,对ExtensionLoader的Adaptive机制和loadFile进行了详细说明,其它内容准备在第二小节里详细说明,敬请期待~

上一篇 下一篇

猜你喜欢

热点阅读