源码修炼笔记之Dubbo扩展点机制详解

2020-06-14  本文已影响0人  花醉霜寒

Dubbo具有良好的扩展性,其扩展性依赖于dubbo使用的各种设计模式和扩展点加载机制,本文对Dubbo SPI加载机制进行探讨。

Dubbo SPI概要

Dubbo SPI在Java SPI的基础上进行了扩展,SPI即Service Provider Interface,期初提供给厂商用作插件开发,只是申明了接口,具体实现不在程序中直接定义,实现Java SPI的步骤大致如下:

Dubbo SPI对Java SPI进行了改进和扩展,包括:

Dubbo SPI核心原理

Dubbo的扩展机制主要依赖于三个核心注解和一个逻辑类,即

\color{green}{扩展点注解SPI}

@SPI可以使用在接口、类和枚举类上,通常情况都是用于接口上,标记当前接口是一个扩展点,可以有多个不同的实现,运行时通过配置找到具体的扩展实现类,其源码如下所示,

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
    String value() default "";
}

其中value代表指定的默认扩展实现类,例如扩展点Compiler指定了默认的扩展实现类为 javassist。

@SPI("javassist")
public interface Compiler {
    Class<?> compile(String code, ClassLoader classLoader);
}

\color{green}{自适应注解Adaptive}

@Adaptive表示自适应,其定义如下

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
    String[] value() default {};
}

该注解可以用在类上和接口上

\color{green}{自动激活注解Activate}

@Activate标示一个扩展点需要自动激活,并指定激活条件,其定义如下

Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
    String[] group() default {};
    String[] value() default {};
    String[] before() default {};
    String[] after() default {};
    int order() default 0;
}

其中group和value标示自动激活条件,before、after和order标示激活的顺序。

\color{green}{ExtensionLoader}

ExtensionLoader类是整个扩展机制的核心类,负责配置的加载,扩展类class文件的创建和缓存以及扩展类的加载和缓存,ExtensionLoader获取扩展类的入口包括三个:

\color{green}{Dubbo扩展点机制的特点}

通过下面的例子来说明Dubbo扩展点的特点

@SPI("defaultBuz")
public interface BusinessService1 {
    
    void doBusiness();
}

@SPI("defaultBuz")
public interface BusinessService2 {
    
    void doBusiness();
}

@SPI("defaultBuz")
public interface BusinessService3 {

    void doBusiness();
}

public class BusinessService1Impl implements BusinessService1 {

    BusinessService2 businessService2;

    BusinessService3 businessService3;

    public BusinessService1Impl(BusinessService2 businessService2) {
        this.businessService2 = businessService2;
    }
    
    public void setBusinessService3(BusinessService3 businessService3) {
        this.businessService2 = businessService2;
    }

    void doBusiness() {
        businessService2.doBusiness();
        businessService3.doBusiness();
    }
}

定义了三个扩展点BusinessService1、BusinessService2和BusinessService3,并提供了BusinessService1的默认实现BusinessService1Impl,该实现中依赖了另外两个扩展点,通过构造方法依赖BusinessService2,通过setter方法依赖BusinessService3,实现了doBusiness方法,即一次调用另外两个扩展点的doBusiness方法。

ExtensionLoader源码解析

\color{green}{获取扩展点ExtensionLoader}

获取某个扩展点的ExtensionLoader的逻辑比较简单,源码如下所示,首先判断传入的class对象是否符合扩展点的要求,即为接口且接口上有@SPI注解,然后尝试从缓存中获取,缓存中不存在,则通过new创建一个,并放入缓存中。

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null)
        throw new IllegalArgumentException("Extension type == null");
    //判断是否为接口
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    }
    //判断接口上是否有@SPI注解
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type(" + type +
                ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    }

    //从缓存中获取,EXTENSION_LOADERS为一个ConcurrentHashMap
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        //缓存中不存在,则创建一个ExtensionLoader,并放入缓存
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

\color{green}{getExtension获取普通扩展类}

getExtension方法执行过程

上图展示了创建普通扩展类实例的过程,不同的方法我用不同的颜色进行了区分,下面从源码层面进行分析,首先是主逻辑方法getExtension,其大致流程为

//获取扩展类
public T getExtension(String name) {
    if (name == null || name.length() == 0)
        throw new IllegalArgumentException("Extension name == null");
    //name为true,返回默认扩展类
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    //从缓存中获取扩展类
    Holder<Object> holder = cachedInstances.get(name);
    if (holder == null) {
        cachedInstances.putIfAbsent(name, new Holder<Object>());
        holder = cachedInstances.get(name);
    }
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                //缓存中不存在,调用createExtension方法创建扩展类
                instance = createExtension(name);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

当ConcurrentMap cachedInstances中不存在扩展类的实例时会通过createExtension创建扩展类实例,方法源码如下所示,其大致过程如下


private T createExtension(String name) {
    //根据name获取class文件
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        //内存中获取扩展类的实例,若不存在则创建
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        //注入依赖的扩展类,此类扩展类是以set方法
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && wrapperClasses.size() > 0) {
            for (Class<?> wrapperClass : wrapperClasses) {
                //注入依赖的扩展类,此类扩展类是以成员变量的身份存在于其他扩展类
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                type + ")  could not be instantiated: " + t.getMessage(), t);
    }
}

创建扩展类的示例对象是通过其class对象,然后用newInstance创建的,获取class对象也是先从缓存中获取,缓存中不存在则通过loadExtensionClasses()方法来加载所有的扩展类。

//获取所有扩展类
private Map<String, Class<?>> getExtensionClasses() {
    //从缓存中获取
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                //缓存中不存在,则加载所有的扩展类class文件
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

加载所有扩展类loadExtensionClasses方法源码如下所示,该方法首先会获取SPI注解上value指定的默认扩展实现名并缓存起来,然后一次通过文件流的方式加载"META-INF/dubbo/internal/"、"META-INF/dubbo/"和"META-INF/services/"路径下制定的扩展类。

//加载所有扩展类
private Map<String, Class<?>> loadExtensionClasses() {
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    if (defaultAnnotation != null) {
        String value = defaultAnnotation.value();
        if (value != null && (value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if (names.length > 1) {
                throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                        + ": " + Arrays.toString(names));
            }
            if (names.length == 1) cachedDefaultName = names[0];
        }
    }

    Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    //加载"META-INF/dubbo/internal/"路径下制定的扩展类
    loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
    //加载"META-INF/dubbo/"路径下制定的扩展类
    loadFile(extensionClasses, DUBBO_DIRECTORY);
    //加载"META-INF/services/"路径下制定的扩展类
    loadFile(extensionClasses, SERVICES_DIRECTORY);
    return extensionClasses;
}

在createExtension创建扩展类的同时会调用injectExtension方法注入依赖的扩展类,injectExtension方法的源码如下所示,首先获取当前扩展类以set开头的setter方法并获取setter方法的参数类型,然后通过ExtensionFactory创建依赖的扩展类的实例,通过反射调用set方法,注入依赖的扩展类。

private T injectExtension(T instance) {
    try {
        if (objectFactory != null) {
            for (Method method : instance.getClass().getMethods()) {
                //判断方法是否为set开头的方法,并且只有一个参数并且是public的,其实就是一个setter方法
                if (method.getName().startsWith("set")
                        && method.getParameterTypes().length == 1
                        && Modifier.isPublic(method.getModifiers())) {
                    Class<?> pt = method.getParameterTypes()[0];
                    try {
                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                        //通过ExtensionFactory创建扩展类
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            //通过反射调用set方法,注入依赖的扩展类
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("fail to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

获取依赖扩展类是通过ExtensionFactory完成的,该工厂提供了一个抽象方法根据type和name获取扩展实现类,其源码如下所示,

@SPI
public interface ExtensionFactory {
    <T> T getExtension(Class<T> type, String name);
}

接口ExtensionFactory本身也是一个扩展点,有三个实现类,分别为

\color{green}{getAdaptiveExtension获取自适应扩展类}

获取自适应扩展类的思路与获取普通扩展类类似,其源码如下所示,具体步骤我在源码的基础上进行了注释,其大致过程为

//获取自适应的扩展类
public T getAdaptiveExtension() {
    //首先尝试从缓存中获取扩展类实例
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        //缓存中不存在,则创建自适应的扩展类
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        } else {
            throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }

    return (T) instance;
}

createAdaptiveExtension创建扩展类,并且通过injectExtension注入其依赖的扩展类,其过程也是获取class对象通过newInstance创建实例对象。

//创建自适应的扩展类
private T createAdaptiveExtension() {
    try {
        //获取自适应的扩展类的class文件实例化,并且注入依赖的其他扩展类
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}

//获取自适应扩展类的class文件
private Class<?> getAdaptiveExtensionClass() {
    //获取所有扩展类class对象
    getExtensionClasses();
    //检查缓存,直接返回
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    //缓存不存在则创建自适应扩展类的class文件
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

同样是先从缓存中获取,缓存中不存在则通过createAdaptiveExtensionClass创建class对象

private Class<?> createAdaptiveExtensionClass() {
    //动态生成@Adaptive结尾的代码
    String code = createAdaptiveExtensionClassCode();
    //获取类加载器
    ClassLoader classLoader = findClassLoader();
    //获取Compiler的自适应扩展类
    com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    //编译得到class文件
    return compiler.compile(code, classLoader);
}

不同于普通扩展类的class文件通过文件流的方式读入内存中,对于应用于接口方法级别的@Adaptive所在的接口会通过createAdaptiveExtensionClassCode方法生成后缀为@Adaptive的源代码,例如Protocol接口,其定义如下

@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();

}

Protocol为一个Dubbo扩展点,export和refer两个方法有@Adaptive注解,表明该方法可以根据参数自适应选择正确的扩展实现,createAdaptiveExtensionClassCode返回的源码如下

public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
}

自动生成的代码实现一些基本功能,如异常处理等,对自适应的方法进行了封装,通过URL中的参数创建对应的扩展extension,实际上还是调用具体extension中的方法,相当于装饰者模式。
动态生成的源代码通过Dubbo提供的编译器Compiler编译为class文件,Dubbo编译器Compiler的定义如下

@SPI("javassist")
public interface Compiler {
    Class<?> compile(String code, ClassLoader classLoader);
}

可以看到Compiler也是一个扩展点,不禁感叹Dubbo中一切皆可扩展!Compiler指定了默认扩展实现为"javassist",该接口有四个实现者:

后面三种均继承AbstractCompiler,AbstractCompiler实现了异常处理等基本功能。其中AdaptiveCompiler使用了类级别的@Adaptive,使用在类级别上的@Adaptive不会生成以“@Adaptive”结尾的Java源代码,不会根据URL参数动生成代码,而是使用编码生成,Dubbo中只有两处使用了类级别的@Adaptive,即AdaptiveCompiler和AdaptiveExtensionFactory。

\color{green}{自动激活扩展点getActivateExtension}

自动激活扩展点主要是通过@Activate注解指定激活条件,满足激活条件的扩展点通过调用普通扩展点加载方法getExtension来完成,最后对设置的排序规则进行排序,其源码如下所示

    public List<T> getActivateExtension(URL url, String[] values, String group) {
        List<T> exts = new ArrayList<T>();
        List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
        if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
            getExtensionClasses();
            for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Activate activate = entry.getValue();
                if (isMatchGroup(group, activate.group())) {
                    T ext = getExtension(name);
                    if (!names.contains(name)
                            && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
                            && isActive(activate, url)) {
                        exts.add(ext);
                    }
                }
            }
            Collections.sort(exts, ActivateComparator.COMPARATOR);
        }
        List<T> usrs = new ArrayList<T>();
        for (int i = 0; i < names.size(); i++) {
            String name = names.get(i);
            if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                    && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                if (Constants.DEFAULT_KEY.equals(name)) {
                    if (!usrs.isEmpty()) {
                        exts.addAll(0, usrs);
                        usrs.clear();
                    }
                } else {
                    T ext = getExtension(name);
                    usrs.add(ext);
                }
            }
        }
        if (!usrs.isEmpty()) {
            exts.addAll(usrs);
        }
        return exts;
    }

至此三种获取扩展点实现实例的方法介绍完毕,本文并未对自适应扩展实现中源码的生成的过程进行详解,有兴趣的可以研究Dubbo源码。

扩展点使用示例

定义一个扩展点HelloService,通过@SPI指定扩展点的默认实现为"impl2",实现了两个扩展实现HelloServiceImpl1和HelloServiceImpl2如下所示,

@SPI("impl2")
public interface HelloService {

    String sayHello();
}

public class HelloServiceImpl1 implements HelloService{

    public String sayHello() {
        String hello = "hello Impl1";
        System.out.println(hello);
        return hello;
    }
}

public class HelloServiceImpl2 implements HelloService {

    public String sayHello() {
        String hello = "hello Impl2";
        System.out.println(hello);
        return hello;
    }
}

在目录src/main/resources/META-INF/dubbo下创建com.kkssyy.dubbo.common.service.HelloService文件

impl1=com.kkssyy.dubbo.common.service.HelloServiceImpl1
impl2=com.kkssyy.dubbo.common.service.HelloServiceImpl2

创建ExtensionClient来进行验证

public class ExtensionClient {

    public static void main(String[] args) {
        ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(HelloService.class);
        HelloService helloService = (HelloService) extensionLoader.getExtension("impl1");
        HelloService helloService2 = (HelloService) extensionLoader.getDefaultExtension();
        //打印hello Impl1
        helloService.sayHello();
        //打印hello Impl2
        helloService2.sayHello();
    }
}

helloService为指定的“impl1”扩展实现,该实现控制台打印"hello Impl1",helloService2为默认扩展实现,该实现控制台打印"hello Impl2"。

总结

本文对Dubbo的扩展机智进行了详解,对ExtensionLoader的源码进行了梳理,并对相关的知识进行全面的介绍,从源码中可以看出ExtensionLoader全流程使用了线程安全的ConcurrentHashMap来缓存class和实例对象,提高了多次方法调用的效率,使用了装饰者模式和工厂模式等设计模式提高了代码的扩展性,此外Dubbo中几乎所有的实现都是基于扩展点的,比如加载扩展类的ExtensionFactory和编译器Compiler等,大大提高了框架的可扩展性。

写在最后

源码路上,共勉!!!

上一篇 下一篇

猜你喜欢

热点阅读