Dubbo服务暴露原理

2020-10-19  本文已影响0人  prozombie

1.概述

RPC作为分布式系统中不可或缺的中间件,在业界已经具有相当成熟的技术实现,其中Dubbo应用得特别广泛,本文将对Dubbo服务暴露的流程进行介绍。在正式进入Dubbo原理探究之前,需要先弄清楚RPC的基本模型:

RPC原理.png
consumer代表服务调用方,provider代表服务提供方,registry代表注册中心。当服务提供方启动时会将自己的信息(服务ip,port等)记录在注册中心,这样在调用方调用的时候,会先从注册中心获取到提供方的基本信息,然后发送网络请求给provider完成调用;同时consumer在启动的时候,会向注册中心订阅消息,这样就能在provider发生变更的时候获取到最新的信息,保证请求路由到正确的provider

2.Dubbo服务暴露概览

Dubbo服务暴露就是指providerregistry注册的过程,以zookeeper作为注册中心,netty作为通讯框架,本文基于2.6.x版本代码分析。使用Dubbo官方提供的demo运行,dubbo-demo-provider配置文件如下:

    <!-- provider's application name, used for tracing dependency relationship -->
    <dubbo:application name="demo-provider"/>

    <!-- use multicast registry center to export service -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <!-- use dubbo protocol to export service on port 20880 -->
    <dubbo:protocol name="dubbo" port="20880"/>

    <!-- service implementation, as same as regular local bean -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>

    <!-- declare the service interface to be exported -->
    <dubbo:service proxy="jdk" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

服务暴露的整个过程大致可以分为四个阶段:

3.创建invoker对象并对其加工

ServiceConfig类中会先调用exportLocal完成本地暴露,然后调用proxyFactory.getInvoker获取invoker对象,先关注下该类中的静态变量proxyFactory,它会调用ExtensionLoader中的getAdaptiveExtension方法完成初始化,该类是Dubbo框架自身实现的一种SPI机制,能够根据配置文件灵活选择接口的具体实现

package com.alibaba.dubbo.config;


/**
 * ServiceConfig
 *
 * @export
 */
public class ServiceConfig<T> extends AbstractServiceConfig {

    private static final long serialVersionUID = 3033787999037024738L;

    /**根据dubbo spi机制加载**/
    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    /**根据dubbo spi机制加载**/
    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

    private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();

    private Class<?> interfaceClass;

    // don't export when none is configured
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

        // export to local if the config is not remote (export to remote only when config is remote)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            /**本地暴露**/
            exportLocal(url);
        }
        // export to remote if the config is not local (export to local only when config is local)
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    /**获取invoker对象**/
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this)
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
    }
}

proxyFactory在初始化的时候会被赋值为ProxyFactory$Adaptive,该类是通过字节码增强实现的

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        /**获取url中的proxy参数,默认为javassist**/
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        /**根据proxy参数获取对应的ProxyFactory接口的实现**/
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

Dubbo针对每一个需要根据配置决定具体实现的接口都会在运行时生成以$Adaptive后缀结尾的类,目的是能够根据配置文件,在运行时灵活地选择接口的具体实现,例如:String extName = url.getParameter("proxy", "javassist")这行代码,如果url中的proxy属性为空即没有在配置文件中指定,则默认使用javassist作为extName的值,加载出JavassistProxyFactory类作为ProxyFactory接口的缺省实现。因为这里配置<dubbo:service proxy="jdk" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>,所以会加载JdkProxyFactory。但通过debug发现,com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName)返回的是StubProxyFactoryWrapper,原因是ProxyFactory接口默认有一个包装类实现,无论是JavassistProxyFactory还是JdkProxyFactory都是在StubProxyFactoryWrapper中通过构造函数完成proxyFactory属性的设置,当ProxyFactory$Adaptive中执行extension.getInvoker(arg0, arg1, arg2)的时候,先会调用到StubProxyFactoryWrappergetInvoker方法,该方法的实现为proxyFactory.getInvoker(proxy, type, url),所以最后会调用到JdkProxyFactorygetInvoker方法。对于这种有包装类实现的接口,$Adaptive会优先加载出包装类,而根据配置所对应的具体实现则是通过构造函数的形式作为包装类的属性被注入,在调用的时候先调用包装类从而间接调用到配置所对应的实现,这里使用了装饰器模式,可以在调用之间增加额外的处理。整个流程可以归纳如下:

public class StubProxyFactoryWrapper implements ProxyFactory {

    private final ProxyFactory proxyFactory;

   /**构造函数中设置proxyFactory值为配置所对应的接口实现**/
    public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        return proxyFactory.getInvoker(proxy, type, url);
    }

}

对应时序图如下所示:


getInvoker.png

返回invoker对象后,进入到protocol.export方法,protocol初始化为Protocol$AdaptiveProtocol接口有两个包装类ProtocolListenerWrapperProtocolFilterWrapper,顾名思义监听器与过滤器,该过程会先调用ProtocolListenerWrapper,该类的protocol属性在初始化的时候会被设置为ProtocolFilterWrapper,在ServiceConfig中调用protocol.export时会直接进入到ProtocolListenerWrapper中的protocol.export(invoker)方法,接着进入ProtocolFilterWrapper中的protocol.export(invoker)方法,ProtocolFilterWrapper初始化时会设置protocol属性为RegistryProtocol,因此该过程最终会调用到RegistryProtocolexport方法


/**
 * ListenerProtocol
 */
public class ProtocolListenerWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            /**ServiceConfig调用protocol.export直接进入到该方法**/
            return protocol.export(invoker);
        }
        /**RegistryProtocol调用protocol.export时候进入**/
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

}

根据ProtocolFilterWrapper类的命名,可以看出它拥有与Spring Filter类似的功能,本质上是一个过滤器

  /**
 * ListenerProtocol
 */
public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }


    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
           /**ServiceConfig调用protocol.export直接进入到该方法**/
            return protocol.export(invoker);
        }
       /**RegistryProtocol调用protocol.export时候进入**/
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

}

因为设置<dubbo:protocol name="dubbo" port="20880"/>RegistryProtocol中的protocol会被设置为DubboRegistry,当调用protocol.export(invokerDelegete)的时候,会再次进入到ProtocolListenerWrapper包装类中,这次会执行new ListenerExporterWrapper<T>


/**
 * ListenerExporter
 */
public class ListenerExporterWrapper<T> implements Exporter<T> {

    private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);

    private final Exporter<T> exporter;

    private final List<ExporterListener> listeners;

    public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
        if (exporter == null) {
            throw new IllegalArgumentException("exporter == null");
        }
        this.exporter = exporter;
        this.listeners = listeners;
        if (listeners != null && !listeners.isEmpty()) {
            RuntimeException exception = null;
            for (ExporterListener listener : listeners) {
                if (listener != null) {
                    try {
                        /**调用listener的exported方法,作为hook函数**/
                        listener.exported(this);
                    } catch (RuntimeException t) {
                        logger.error(t.getMessage(), t);
                        exception = t;
                    }
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    @Override
    public Invoker<T> getInvoker() {
        return exporter.getInvoker();
    }

    @Override
    public void unexport() {
        try {
            exporter.unexport();
        } finally {
            if (listeners != null && !listeners.isEmpty()) {
                RuntimeException exception = null;
                for (ExporterListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.unexported(this);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }

}

重点关注ListenerExporterWrapper的构造函数,该函数主要完成两件事,一是赋值exporter变量,将外部传入的Exporter对象保存,二是遍历外部传入的ExporterListener列表并调用其exported方法。此处为扩展点Dubbo对于ExporterListener接口只给出了ExporterListenerAdapter这一实现且exported方法实现为空,开发者可以自己实现该方法以达到扩展的目的,当然也可以自己实现ExporterListener接口,依据Dubbo SPI机制加载执行自定义的业务逻辑。除构造函数以外,unexport也拥有类似的功能,当调用unexport的时候会遍历之前保存的listeners并调用其unexport方法。
继续回到ProtocolFilterWrapper,当调用return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER))时,会触发buildInvokerChain

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    /**包装invoker对象**/
                    return filter.invoke(next, invocation);
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

遍历通过SPI加载出来的Filter来包装invoker对象,当invoker调用invoke方法时,就会触发Filter中的实现逻辑,此处使用责任链设计模式,当然开发者也可以实现自己的Filter来对此处逻辑进行扩展。整个流程时序图如下所示:

ServiceConfig -_ RegistryProtocol -_ DubboProtocol.png

4.开启netty服务

RegistryProtocol中调用exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)进入到DubboProtocol类中的export方法,

/**
 * dubbo protocol support.
 */
public class DubboProtocol extends AbstractProtocol {

    public static final String NAME = "dubbo";

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        /**获取key,com.alibaba.dubbo.demo.DemoService:20880**/
        String key = serviceKey(url);
        /**创建exporter对象**/
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        /**保存exporter对象**/
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        /**创建netty server**/
        openServer(url);
        /**初始化序列化器**/
        optimizeSerialization(url);
        return exporter;
    }

    private void openServer(URL url) {
        // find server.
        /**ip+port**/
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            /**从缓存中获取server对象**/
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                /**缓存中没有则创建server**/
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
       /**设置传输协议为dubbo**/
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            /**netty Server初始化**/
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
}

首先会创建DubboExporter对象,并把该对象保存到exporterMap中,其中类:端口会被作为key,例如:com.alibaba.dubbo.demo.DemoService:20880openServer中是获取netty Server的具体逻辑,serverMap中会存放创建好的server,key是ip:port,如果在serverMap中没有存储server,则调用createServer方法创建。

5.获取zookeeper连接并将服务注册成为zookeeper上的节点

完成Netty Server启动后,通过RegistryProtocol中的getRegistry方法创建ZookeeperRegistry对象,该对象的构造函数会进行zookeeper的连接。

private Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return registryFactory.getRegistry(registryUrl);
}

通过返回的ZookeeperRegistry对象,调用subscribe方法便将服务注册成为了zookeeper的节点,/dubbo/com.alibaba.dubbo.demo.DemoService/providers,值为dubbo://ip:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1644&proxy=jdk&side=provider&timestamp=1603026176171,里面包含服务的ip、端口、接口名、接口中包含的方法、代理模式等等。

6.总结

整个服务暴露的过程就是服务向注册中心注册的过程,除了基本的实现以外,Dubbo在该过程中还提供了ListenerFilter这两个扩展点方便开发者进行定制化的实现。

上一篇下一篇

猜你喜欢

热点阅读