微服务架构和实践程序员Dubbo剖析

dubbo剖析:一 服务发布

2018-03-25  本文已影响928人  益文的圈

注:文章中使用的dubbo源码版本为2.5.4

零、服务发布的目的

服务提供者向注册中心注册服务,将服务实现类以服务接口的形式提供出去,以便服务消费者从注册中心查阅并调用服务。

一、服务发布入口

1.1 Spring配置及ServiceBean映射

服务发布方在工程中会有如下Spring配置


服务发布的spring配置

其中demoService为Spirng中配置服务的具体实现,即Spring中的一个Bean

<bean id="demoService"class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />

而对于下方配置,spring容器在启动的过程中会解析自定义的schema元素dubbo:service将其转换为实际的配置实现ServiceBean ,并把服务暴露出去

<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref=”demoService”/>

1.2 ServiceBean

ServiceBean

类结构与onApplicationEvent回调:
ServiceBean除了继承dubbo自己的配置类ServiceConfig以外,还实现了一系列的spring接口用来参与到spring容器的启动以及bean创建过程中。其中包括ApplicationListener。

    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                export();
            }
        }
    }

Spring容器ApplicationContext的启动最后一步会触发ContextRefreshedEvent事件, 而ServiceBean实现了ApplicationListener接口监听此事件,触发onApplicationEvent(ApplicationEvent event)方法,在这个方法中触发export方法来暴露服务。

包含的属性:
ServiceBean的父类ServiceConfig中包含了很多配置属性,这些属性通过Spring配置注入赋值。

private transient String beanName;  //bean名称,对应xml中的id
private String interfaceName;  //接口名称,对应xml中的interface
private Class<?> interfaceClass;  //通过Class.forName(interfaceName)生成
private T ref;  //  接口实现类引用,对应xml中的ref
protected List<ProtocolConfig> protocols;  //协议列表
protected List<RegistryConfig> registries;  //注册中心列表
private final List<Exporter<?>> exporters;  //已发布服务列表
private final List<URL> urls;  //已发布服务地址列表
private static final Protocol protocol;
private static final ProxyFactory proxyFactory;

二、几个关键概念:

2.1 Invoker

public interface Invoker<T> extends Node {

    /**
     * get service interface.
     *
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     *
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;

}

可执行对象的抽象,能够根据方法的名称、参数得到相应的执行结果。
Invoker可分为三类:

Invocation:包含了需要执行的方法和参数等重要信息,他有两个实现类RpcInvocation和MockInvocation。

2.2 ProxyFactory

public interface ProxyFactory {

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * create invoker.
     *
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

服务接口代理抽象,用于生成一个接口的代理类。
getInvoker方法:针对Server端,将服务对象(如DemoServiceImpl)包装成一个Invoker对象。
getProxy方法:针对Client端,创建接口(如DemoService)的代理对象。

2.3 Exporter

public interface Exporter<T> {

    /**
     * get invoker.
     *
     * @return invoker
     */
    Invoker<T> getInvoker();

    /**
     * unexport.
     * <p>
     * <code>
     * getInvoker().destroy();
     * </code>
     */
    void unexport();

}

维护Invoker的生命周期,内部包含Invoker或者ExporterMap。

2.4 Protocol

@SPI("dubbo")
public interface Protocol {

    /**
     * 获取缺省端口,当用户没有配置端口时使用。
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露远程服务:
     * @param <T>     服务的类型
     * @param invoker 服务的执行体
     * @return exporter 暴露服务的引用,用于取消暴露
     * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用远程服务:
     * @param <T>  服务的类型
     * @param type 服务的类型
     * @param url  远程服务的URL地址
     * @return invoker 服务的本地代理
     * @throws RpcException 当连接服务提供方失败时抛出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 释放协议
     */
    void destroy();

}

协议抽象接口。封装RPC调用。
exporter方法:暴露远程服务(用于服务端),就是将Invoker对象通过协议暴露给外部。
refer方法:引用远程服务(用于客户端),通过Clazz、url等信息创建远程的动态代理Invoker。

2.5 关系图

服务发布相关接口关系图

1)ServiceConfig包含ProxyFactoryProtocol,通过SPI的方式注入生成;
2)ProxyFactory负责创建Invoker
3)Protocol负责通过Invoker生成Exporter,将服务启动并暴露;

三、服务发布流程详解:

3.1 简洁流程图

服务发布简洁流程图

3.2 发布入口

ServiceBean监听入口:

    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                export();
            }
        }
    }

ServiceBean的export()方法内部最终会执行到ServiceConfig的doExportUrls()方法-->

ServiceConfig执行发布:
1)加载所有注册中心URL
2)遍历所有Protocol,进行发布

    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

doExportUrls()方法内部最终执行下面两个重要步骤,即 “本地发布”“远程发布” -->

3.3 本地发布:

即将服务发布成本地可调用的服务。

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }

重点:Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
代码中的proxyFactory为通过ExtensionLoader动态生成的JavassistProxyFactory
代码中的protocol为通过ExtensionLoader动态生成的InjvmProtocol

...ExtensionLoader相关原理会在后续文章专门讲解...

JavassistProxyFactory创建Invoker:
通过JavassistProxyFactory创建(new)了一个AbstractProxyInvoker的实现,其内部通过Java反射的方式执行原始对象proxy的方法。

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

InjvmProtocol.export:
new了一个InjvmExporter。就是单纯的将url、Exporter放入exporterMap中。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

3.4 远程发布:

遍历所有注册中心URL,进行远程发布:

                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }

代码中的proxyFactory为通过ExtensionLoader动态生成的JavassistProxyFactory
代码中的protocol为通过ExtensionLoader动态生成的RegistryProtocol

JavassistProxyFactory创建Invoker:
同本地发布,不赘述。

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

RegistryProtocol.export:
通过RegistryProtocol将Invoker发布成Dubbo服务。
1)doLocalExport 所做的事情,就是调用DubboProtocol生成DubboExporter,并发布Dubbo服务;
2)后续代码所做的事情,就是创建注册中心,将发布的服务注册到注册中心(zk),并监听注册中心(zk)的变动;

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }

            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

doLocalExport:
bounds为providerurl <--> exporter的映射,如果exporter未被创建,则调用DubboProtocol创建exporter。

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

DubboProtocol.export:

DubboProtocol.export

总体做的事情就是:

...对于服务端Server的实现分析请参考文章dubbo剖析:三 网络通信之 -- Server实现 ...

向注册中心注册/监听:

        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

...注册中心相关内容不展开...

四、整体流程图总结

整体流程图总结
总结起来,Dubbo的服务发布过程:
1)通过Spring配置初始化ServiceBean并注入属性;
2)通过监听Spring事件触发服务发布过程;
3)ServiceConfig中的ProxyFactoryProtocol由Dubbo的spi动态生成;
4)对所有的协议,所有的注册中心进行遍历,通过JavassistProxyFactory生成可执行对象Invoker;
5)通过RegistryProtocol将Invoker对象转换为Exporter,同时完成服务的启动监听和注册;
6)最终由ServiceConfig维护所有发布的Exporter与服务URL到本地内存;
上一篇下一篇

猜你喜欢

热点阅读