dubbo

Dubbo服务启动过程(二)

2017-12-01  本文已影响134人  此鱼不得水

上一章中我只是简单论述了一下服务的启动过程的前置内容,主要是做一些参数的检查和URL的封装,然后在文章末尾引出了本章介绍的重点内容,下面我们真正来看一下服务的暴露过程都做了哪些操作。

其实服务暴露的过程只要是完成了两件事情:

  • 将服务注册到注册中心,供调用方发现
  • 监听指定的端口,等待服务调用方进行调用

将服务注册到注册中心,供调用方发现

这个过程如何产生,还是困扰了我一天的。最初我以为直接通过Protocol获得Extension的时候会将RegistryProtocol作为Wrapper进行包装(原因是从一个阿里云的博客上看到上面这么写到),后来因为先入为主的原因导致我也一直被文章的思路限制,但是我自己看了相关的加载逻辑并且通过单元测试去验证这个,发现不是这样子的。RegistryProtocol虽然作为Protocol的实现类,但是却不是他的包装类。所以并不是在加载包装类的时候加载到的。这里的说法其实是有问题的,最后通过相关的学习发现其实是这么回事:

其实在ServiceConfig中有这么一段代码:

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

然后在export()最后面是这么调用的:

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

//将invoker转化为exporter对象
Exporter<?> exporter = protocol.export(invoker);

exporters.add(exporter); //将exporter添加到需要暴露的列表中取

那么这里的protocol究竟是哪个Protocol的实现呢:

这时候的URL对象大概是这样子:

==registry==://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&==export=dubbo%3A%2F%2F192.168.153.1%3A20880%2Fcom.alibaba.dubbo.demo.bid.BidService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.bid.BidService%26methods%3DthrowNPE%2Cbid%26optimizer%3Dcom.alibaba.dubbo.demo.SerializationOptimizerImpl%26organization%3Ddubbox%26owner%3Dprogrammer%26pid%3D3872%26serialization%3Dkryo%26side%3Dprovider%26timestamp%3D1422241023451==&organization=dubbox&owner=programmer&pid=3872&registry=zookeeper&timestamp=1422240274186

所以说这里的protocol应该是RegistryProtocol
下面我们看那RegistryProtocol的export具体操作:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //这一步进行真正的暴露操作,下面都是注册服务的代码
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //这里就是根据invoker持有的URL获得Register对象,鉴于使用的都ZookeeperRegister,所以这里就以ZookeeperRegister作为研究对象
        final Registry registry = getRegistry(originInvoker);
        //获得URL中黄色的部分,然后去除部分不需要的参数
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }
    
    

先看一下暴露服务的时候都做了什么:

//这个操作本质上就是将上述的黄色部分对应的值与exporter作为一个映射存储下来
private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        //获得上文中黄色部分的URL
        String key = getCacheKey(originInvoker);
        //exporter代理,建立返回的exporter与protocol export出的exporter的对应关系,在override时可以进行关系修改.本质上就是维护exporter和invoker映射关系的类
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    //InvokerDelegete简单的Invoker代理
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    //将原先的url替换为上面黄色部分,重新暴露
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }
    
    private String getCacheKey(final Invoker<?> originInvoker){
        URL providerUrl = getProviderUrl(originInvoker);
        String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
        return key;
    }
    
    /**
     * 通过invoker的url 获取 providerUrl的地址
     * @param origininvoker
     * @return
     
     首先说明一点:仔细看上面的URL的话会发现为什么会有%26和%3D这种编码符号,看起来也不利于阅读。
     %26 = &,%3D = ‘=’,经过URL编码的含义就在于将很多key,value的件值对作为一个整体封装成一个大的value,含义就相当于URL里面可以嵌套URL。下面这段代码就是取出export对应的URL,这个URL嵌套在原始的invoker持有的URL中,是用来进一步暴露服务需要的URL
     */
    private URL getProviderUrl(final Invoker<?> origininvoker){
        String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
        if (export == null || export.length() == 0) {
            throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
        }
        
        URL providerUrl = URL.valueOf(export);
        return providerUrl;
    }

其次的话就是将服务注册到注册中心上去,这里的核心是:
registry.register(registedProviderUrl);
这里以ZookeeperRegistry为准来介绍:

    因为ZookeeperRegistry还是继承了FailBackRegistry的,所以入口在这里面:
    @Override
    public void register(URL url) {
        //在内部的已经注册的服务列表加上要注册的url
        super.register(url);
        //首先尝试从两个失败的列表里面移除这个url,如果该次重试的时候依然失败的话就再次从这里面移出来
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 向服务器端发送注册请求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // 如果开启了启动时检测,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if(skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // 将失败的注册请求记录到失败列表,定时重试
            failedRegistered.add(url);
        }
    }
    
    
    //失败后会将失败的url记录到失败列表中,然后定时重试,下面是重试的逻辑,是无限次的重试噢。
    protected void retry() {
        //看看有没有注册失败的URL
        if (! failedRegistered.isEmpty()) {
            Set<URL> failed = new HashSet<URL>(failedRegistered);
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry register " + failed);
                }
                try {
                    for (URL url : failed) {
                        try {
                            doRegister(url);
                            //不报错就代表注册成功,然后从失败的列表中移除改url即可 failedRegistered.remove(url);//在这里就移除那些已经尝试注册成功的url
                        } catch (Throwable t) { // 忽略所有异常,等待下次重试
                            logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                        }
                    }
                } catch (Throwable t) { // 忽略所有异常,等待下次重试
                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
    }
    
    然后我们看一下ZooKeeperRegistry中的注册方法:
    protected void doRegister(URL url) {
        try {
            //这里就要理解一下这个toUrlPath的返回结果了,我本地测试了一下这个方法的返回结果是这样的:
            // /dubbo/com.alibaba.dubbo.demo.bid.BidService/providers/dubbo%3A%2F%2F192.168.153.1%3A20880%2Fcom.alibaba.dubbo.demo.bid.BidService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.bid.BidService%26methods%3DthrowNPE%2Cbid%26optimizer%3Dcom.alibaba.dubbo.demo.SerializationOptimizerImpl%26organization%3Ddubbox%26owner%3Dprogrammer%26pid%3D3872%26serialization%3Dkryo%26side%3Dprovider%26timestamp%3D1422241023451
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    
image

从这个图中对比一下就能清楚的看到具体注册到zookeeper中的结构,图中的URL层面就是实际上provider端的参数汇总URL。
因为服务端就把整个URL都注册到注册中心中去,所以服务调用者在调用的时候就能方便的拿到服务端配置的参数,后面的话再次进行覆盖或者变更也十分方便。所以注册中心不仅仅是发现服务的一个途径,更是参数沟通的一个渠道。

其实在服务的注册过程中有更多的细节值得探究,比如服务的变更通知,本地存储的缓存文件等等。但这些我认为这些都不影响对于主流程的分析,在第一遍读源码的时候还是以主流程为主,以后如果需要了在进行二次分析。

服务启动过程的第一步服务注册,基本在这里就是这个内容了。之后会讲解如何监听端口进行下一步的。

上一篇下一篇

猜你喜欢

热点阅读