dubbo服务发布过程浅析

2018-02-01  本文已影响0人  mikewt

 dubbo服务暴露就是一个远程代理,打开网络监听,接受服务调用请求,将服务接口名,IP,port发布到注册中心的过程。通过《dubbo启动过程分析》可以了解到,在spring容器启动时会将容器中所有的bean初始化成单实例(默认),如果bean继承相应的接口,在实例初始化完成后,会调用实现类中某些接口方法。dubbo的初始化也是通过这样一个过程完成的。
ServiceConfig.export->doExport()-> doExportUrls()->doExportUrlsFor1Protocol()

//根据url暴露
 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();//获取服务提供者协议名称
        if (name == null || name.length() == 0) {
            name = "dubbo";//默认是dubbo
        }
        //获取服务主机名,为空则自动查找本机IP
        if (NetUtils.isInvalidLocalHost(host)) {
            anyhost = true;
            try {
                host = InetAddress.getLocalHost().getHostAddress();
            } catch (UnknownHostException e) {
                logger.warn(e.getMessage(), e);
            }
        -----------------------------------------------
    }
 //SPI加载协议实现类中的默认端口常量 
 final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
        if (port == null || port == 0) {//protocol没有配置port
            port = defaultPort;
        }
        if (port == null || port <= 0) {
            port = getRandomPort(name);//使用本机随机可用端口
            if (port == null || port < 0) {
                port = NetUtils.getAvailablePort(defaultPort);
                putRandomPort(name, port);
            }
            logger.warn("Use random available port(" + port + ") for protocol " + name);
        }
       //组织参数
       Map<String, String> map = new HashMap<String, String>();
       ..............................................................
       //形成类似dubbo://的统一URL
      URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        String scope = url.getParameter(Constants.SCOPE_KEY);//获取是scope属性
        //配置为none不暴露
        if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        //导入监控中心信息
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                         //首先将URL中dubbo替换为registry,创建远程代理Invoker
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                         //此处protocol为spi加载的适配类,会根据invoker中的protocol不同
                         //调用不同的具体实现类,此处在SPI分析中已经说明
                        //Protocol$Adaptor.export()-->dubbofilterwrapper.export()-->dubbolistenerwrapper.export()-->dubboprotocol.export()
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {//本地服务
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
}

接下来将创建接口实现类的包装类,也就是服务的包装类,调用过程如下:
proxyfactory$adpative.getInvoker()--> StubProxyFactoryWrapper.getInvoker()->JavassistProxyFactory.getInvoker()
调用Javaassit创建服务包装类,生成的类如下,重点看invokeMethod()这个方法,后面invoker.invoke()会掉到此方法去执行业务逻辑。

package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.demo.provider.DemoServiceImpl;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;

public class Wrapper1
  extends Wrapper
  implements ClassGenerator.DC
{
  ...........................................................
  public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
    throws InvocationTargetException
  {
    DemoServiceImpl localDemoServiceImpl;
    try
    {
      localDemoServiceImpl = (DemoServiceImpl)paramObject;
    }
    catch (Throwable localThrowable1)
    {
      throw new IllegalArgumentException(localThrowable1);
    }
    try
    {
      if ((!"sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) {
        return localDemoServiceImpl.sayHello((String)paramArrayOfObject[0]);
      }
    }
    catch (Throwable localThrowable2)
    {
      throw new InvocationTargetException(localThrowable2);
    }
  }
}

最后返回AbstractProxyInvoker的实现,此实现类是一个非常重要的类,包含了服务实现类,服务接口,url,还有刚才生成的服务包装类的引用。
然后执行到如下的方法:
Exporter<?> exporter = protocol.export(invoker);
此处调用的过程为protocol$Adaptive.export()-->ProtocolFilterWrapper.export()--->ProtocolListenerWrapper.export(),两个wrapper什么都没有做,直接放行,最终调到RegistryProtocol.export()
参数为刚才封装好的AbstractProxyInvoker实现类,然后:
ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
下面来分析这个方法,这个方法非常重要,就在此方法中调用DubboProtocol完成服务的发布

private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>
        //此处打开网络监听
((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

在doLocalExport()中,将provider url与AbstractProxyInvoker实现类封装到InvokerDelegete对象中,然后执行protocol.export(invokerDelegete)方法,此处后面会重点分析,此方法返回一个DubboExporter对象,将此对象与AbstractProxyInvoker实现类封装在ExporterChangeableWrapper对象中,并存储在RegistryProtocol这个类的bounds属性,这个属性是一个线程安全的map,以便以后服务调用使用。下面分析调用dubbo协议打开网络监听的过程。依然是SPI机制,经过ProtocolFilterWrapper.export(),完成对Invoker的包装,Invoker中加入了Filter调用链。

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
         //SPI机制加载所有Filter扩展
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
         //加入Filter执行链
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                .......................................................
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                .......................................................
                };
            }
        }
        return last;
    }

然后经过ProtocolListenerWrapper.export()调到DubboProtocol.export(),将结果封装成ListenerExporterWrapper返回。

 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        ...................................................
        openServer(url);
        
        return exporter;
    }

将前面传递过来的invoker,key,exporterMap封装到DubboExporter中,打开server此处是后续调用netty打开网络监听,最终返回到DubboProtocol处的是HeaderExchangeServer,此对象持有nettyserver,在nettyserver构造方法中,有两个构造参数一个是url,另一个是DecodeHandler,此对象里又封装了HeaderExchangeHandler,又封装了ExchangeHandler。初始化了nettyserver的基本参数如:ip,port,timeout等等。具体调用流程如下:
openServer(url)-->createServer(url)-->Exchangers.bind(url, requestHandler)-->HeaderExchanger.bind()-->NettyTransporter.bind()-->nettyserver.doopen()
在doopen()方法中,就是对netty的初始化操作,设置线程池,绑定decoder、encoder、handler,然后打开端口,进行监听。

  @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        channel = bootstrap.bind(getBindAddress());
    }

打开网络监听后,再次回到RegistryProtocol类中,下面就开始调用注册中心进行服务注册和订阅。首先调用getRegistry(originInvoker),spi机制,初始化时注入的是registryFactory适配类,根据url中注册中心参数获取具体的实现类,此处是ZookeeperRegistryFactory

private Registry  getRegistry(final Invoker<?> originInvoker){
        URL registryUrl = originInvoker.getUrl();
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
        }
        return registryFactory.getRegistry(registryUrl);
    }

调用到AbstractRegistryFactory.getRegistry()>ZookeeperRegistryFactory.createRegistry(),最终返回一个 ZookeeperRegistry对象

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
    //此处是spi机制注入的适配类
    private ZookeeperTransporter zookeeperTransporter;
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
}

返回到RegistryProtocol.export(),获得到了 ZookeeperRegistry,然后调用registry.register(registedProviderUrl)进行注册,进一步跟踪,调用链路为:FailbackRegistry.register()-->AbstractRegistry.register()-->ZookeeperRegistry.doRegister(),在zk上创建一个临时节点,注册完成

protected void doRegister(URL url) {
        try {
             //在zk上创建一个临时节点
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

回到RegistryProtocol,注册完成之后,下面开始订阅,需要感知zk node的变化,此处使用的zk的watcher机制,首先初始化一个NotifyListener,后面监听变化调用到此对象中的notify方法。

 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

调用链路:FailbackRegistry.subscribe()-->AbstractRegistry.subscribe()-->ZookeeperRegistry.doSubscribe(),主要是注册主节点和子节点监听

protected void doSubscribe(final URL url, final NotifyListener listener) {
 .......................
   List<URL> urls = new ArrayList<URL>();
                for (String path : toCategoriesPath(url)) {
                   //zkListeners根据key存储了主节点和子节点监听
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {//没有 初始化
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                   //根据主节点监听获取子节点监听
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {//没有,new一个
                        listeners.putIfAbsent(listener, new ChildListener() {
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                //节点删除,更新会触发notify方法
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);//创造一个永久节点??
                   //添加子节点监听器
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
 .......................
}

notify(url, listener, urls)调用链比较复杂,链路为:
FailbackRegistry.notify()-->FailbackRegistry.doNotify()-->AbstractRegistry.notify()-->listener.notify(),此处的listener为刚才registryProtocol传递过来的OverrideListener,然后调用OverrideListener.notify()-->RegistryProtocol.doChangeLocalExport(),对修改了url的invoker重新export,至此整个发布过程全部完成。

private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl){
        String key = getCacheKey(originInvoker);
        final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null){
            logger.warn(new IllegalStateException("error state, exporter should not be null"));
            return ;//不存在是异常场景 直接返回 
        } else {
            final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
            //调用dubboprotocol.export()重新发布
            exporter.setExporter(protocol.export(invokerDelegete));
        }
    }
上一篇下一篇

猜你喜欢

热点阅读