Dubbo服务暴露

2021-02-08  本文已影响0人  爱健身的兔子

1 Dubbo服务暴露介绍

Dubbo服务暴露的整体流程如下:

2 Dubbo服务暴露源码

2.1 延时暴露

ServiceConfig#export

    public synchronized void export() {
        //检查和更新附属的配置文件
        checkAndUpdateSubConfigs();
        // shouldExport 默认值为 true    如果export==null?true:export
        //如果只想本地启动服务,不把服务暴露出去给别人使用,可以配置<dubbo:provider export="false" />禁止服务导出
        if (!shouldExport()) {
            return;
        }
        //如果应该延迟导出,就去延迟导出 ,这里不研究
        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            // 立即导出服务
            doExport();
        }
}

如果设置了 delay 参数,Dubbo 的处理方式是启动一个守护线程在 sleep 指定时间后再 doExport。

2.2 暴露检查

ServiceConfig#doExport

 protected synchronized void doExport() {
        // 这个服务不需要暴露  
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        //这个服务已经暴露了
        if (exported) {
            return;
        }
        exported = true;
        //如果path为空,给它赋值
        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        //多协议多注册中心暴露服务
        doExportUrls();
    }

判断服务是否已经暴露或者不需要暴露。

2.3 多协议多注册中心暴露服务

ServiceConfig#doExportUrls

//Dubbo允许我们使用不同的协议暴露服务,也允许我们向多个    注册中心注册服务
    //多协议多注册中心暴露服务
    private void doExportUrls() {
        //通过loadRegistries加载注册中心链接
        List<URL> registryURLs = loadRegistries(true);
        //遍历协议、在每个协议下暴露服务
        for (ProtocolConfig protocolConfig : protocols) {
            // 通过(路径+guoup+version)构建一个string类型的 路径key  从协议配置类中,获取上下文路径.
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            //提供者模型
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            //ApplicationModel持有所有的提供者模型 .init初始化提供者模型 
            ApplicationModel.initProviderModel(pathKey, providerModel);
            //组装URL传入协议配置类 注册集合
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

loadRegistries方法的主要是构造注册配置类的URL。

2.4 组装URL

ServiceConfig#doExportUrlsFor1Protocol

    private void   doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//省略URL 的构造 (协议名、IP、端口、上下文路径、map ),map包含了url的参数(如generic,token)。
//------------------------ 暴露Dubbo服务-----------------------------
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            // 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
        //  作用范围三个取值(none:什么都不做、scope != remote,暴露到本地scope != local,暴露到远程)
        String scope = url.getParameter(SCOPE_KEY);
        //如果scope !=none 则进入方法
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
            // scope != remote 暴露到本地
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                //导出到本地
                exportLocal(url);
            }
            //  scope != local  暴露到远程
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {

                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        // 如果协议只是 injvm , 而不是redister
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        // 加载监视器链接
                        URL monitorUrl = loadMonitor(registryURL);
                        // 如果 monitor不等于null     将监视器链接作为参数添加到URL中
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        // 对于提供程序,这用于启用自定义代理来生成调用程序
                        String proxy = url.getParameter(PROXY_KEY);

                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                        // 为服务提供类(ref),生成Imvoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        //暴露 服务  并生成exporter
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                    //不存在注册中心 ,仅仅暴露服务
                } else {
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                //发布元数据
                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                if (metadataService != null) {
                    metadataService.publishServiceDefinition(url);
                }
            }
        }
    }

注意

如果配置 scope=none, 则不会进行服务暴露;如果没有配置 scope 或者 scope=local,则会进行本地暴露。本地暴露时使用 injvm 协议,injvm 协议是一个伪协议,它不开启端口,不能被远程调用,只在 JVM 内直接关联,但执行 Dubbo 的 Filter 链。

上面远程暴露服务会根据是有有注册中心分两种情况:

  1. 直接暴露服务

  2. 通过RegistryProtocol暴露远程服务后,暴露服务到注册中心。

2.5 创建Invoker

在服务暴露之前需要创建InvokerInvoker是实体域,它是Dubbo的核心模型,其他模型都向它靠拢,或者转换成它。它代表一个可执行体,可向它发起invoke调用,这个调用有可能是一个本地的实现,也有可能是一个远程的实现,也有可能是一个集群的实现。 InvokerProxyFactory创建的,Dubbo的默认代理工厂实现类是JavassistProxyFactory

JavassistProxyFactory#getInvoker

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // 为目标类创建 Wrapper(方法就是,从缓存中获取Wrepper ,如果缓存中没有,创建,写入缓存)
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
2.6 本地暴露

ServiceConfig#exportLocal

 private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL)   //设置协议头为 injvm
                .setHost(LOCALHOST_VALUE)      //设置Host为localhost
                .setPort(0)
                .build();
        // 创建 Invoker,并暴露服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
        Exporter<?> exporter = protocol.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

本地暴露是通过InjvmProtocol暴露。

2.7 远程暴露

远程暴露的协议通过ExtensionLoader加载,同时会加载ProtocolFilterWrapperProtocolListenerWrapper作为Protocol的包装类。

2.8 注册服务

如果使用了注册中心,则在通过具体协议(如 Dubbo 协议)暴露服务之后进入服务注册流程,将服务节点注册到注册中心。

RegistryProtocol#export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    //向Zookeeper注册节点
    registry.register(registedProviderUrl);
    // 订阅override数据
    // 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保证每次export都返回一个新的exporter实例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
            try {
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                overrideListeners.remove(overrideSubscribeUrl);
                registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    };
}

private Registry getRegistry(final Invoker<?> originInvoker){
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    return registryFactory.getRegistry(registryUrl);
}

3 DubboProtocol

Dubbo协议的暴露的服务是通过Netty远程通信实现服务调用。

DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispaching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice){
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
            if (logger.isWarnEnabled()){
                logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);

    return exporter;
}

DubboProtocol#openServer

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也可以暴露一个只有server可以调用的服务。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            //server支持reset,配合override功能使用
            server.reset(url);
        }
    }
}

private ExchangeServer createServer(URL url) {
    //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

Exchanger (默认 HeaderExchanger)封装请求响应模式,同步转异步,以 Request、Response 为中心。

HeaderExchager#bind

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporters#bind

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().bind(url, handler);
}

底层传输默认使用 NettyTransporter,最终是创建 NettyServer。

NettyTransporter#bind

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}

至此Netty连接创建。

上一篇下一篇

猜你喜欢

热点阅读