Dubbo

3-dubbo源码分析之服务引用

2018-08-30  本文已影响0人  致虑
image.png image.png

官网说明:


一.概览
二.容器初始化
 import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
     public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.cluster.Directory {
       if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
       if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
       String extName = url.getParameter("cluster", "failover");
       if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
       com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
       return extension.join(arg0);
     }
}

看到“failover”就知道后面使用的是FailoverCluster就好,具体的分析后面再讲。


二.服务引用

    import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
  public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
      }
      
      public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();  
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
      }
      
      public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
      }
      
      public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
      }
} 

这里如果url里面没有指定protocol,便取dubbo,那么看看url中的protocol此时是啥:


image.png

很明显是Registry,那么此时跟踪到RegistryProtocol中的refer,那么继续跟踪会发现:


image.png
image.png
这两个wrapper在provider中就遇见了,此处的协议是register,所以直接绕过,进入到RegisterProtocol:
    package com.alibaba.dubbo.remoting.zookeeper;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ZookeeperTransporter$Adpative implements com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter {
    public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "zkclient"));
        
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])");
        
        com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName);
        
        return extension.connect(arg0);
    }
}

很明显是zkClient,此处还有个点:

 @SPI("curator")
 public interface ZookeeperTransporter {
     @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
     ZookeeperClient connect(URL url);
 }

方法级别的Adaptive,动态寻找自适应扩展点中的一种机制,这里很简单不多说。

image.png
看到核心逻辑,无非就是触发下相关缓存的更新。也就是获取注册中心过程中所隐藏的动作已经被指明了,就等着后面触发了。

image.png
image.png
回答上面问题最好的出发点就是这里:
directory通用认知就是consumer中所维护的invokers的一个List,但这个invokers列表是实时变化的。也就是:

- 1.如果是consumer先初始化,这个directory中的methodInvokerMap和urlInvokerMap为空,待provider初始化OK 之后,借助注册中心进行notify,就会触发consumer linstener 刷新directory中的invokers列表了。
- 2.如果是provider先初始化,consumer本身初始化是就会订阅注册中心,在订阅的同时就会触发注册中心进行一次notify,此时就会触发consumer linstener 刷新directory中的invokers列表了。

因此consumer不管初始化顺序如何,相关的服务信息更新都是交由cluster中的directory去做的。而过程就是通过监听方式异步去实施。很好理解这里的directory中的invokers缓存
这里很清楚了,zkListener就是对path的一个自定义监听器,将监听器转换成zk自己的监听器对象然后实现具体路径节点的监听,进而触发后续回调到自定义监听器的逻辑,也就是这里的ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
image.png
image.png
image.png

这里补充一个核心点:


image.png
image.png
整体代码如下:
```
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<String>();
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {

        // If protocol is configured at the reference side, only the matching protocol is selected
        // 从注册中心获取到的携带提供者信息的url
        // 如果reference端配置了protocol,则只选择匹配的protocol
        if (queryProtocols != null && queryProtocols.length() > 0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break;
                }
            }
            if (!accept) {
                continue;
            }
        }
        if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }
        if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
            logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
                    + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
            continue;
        }
        URL url = mergeUrl(providerUrl);

        String key = url.toFullString(); // The parameter urls are sorted
        if (keys.contains(key)) { // Repeated url
            continue;
        }
        keys.add(key);
        // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
        // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
        /**
         * urlInvokerMap缓存详解:
         *
         * A: 消费端先启动,服务端后启动:
         * 1.消费端启动时,先从本地或者注册中心拉取 XXX./dubbo.cache文件中取一份provider列表
         * 2.如果本地缓存不存在,直接绕过远端provider的服务信息;
         * 3.若本地缓存存在,生成相关的远端provider的服务信息,置入directory中
         * 4.启动服务端,notify到消费端,从缓存中按服务端url信息取出远端相关provider信息,若相同则不从新置入信息,否则重新置入【多个服务端时,若此时只启动一个服务端,则只需要置入当前服务端信息,其他信息保留】
         *
         * A: 服务端先启动,消费端后启动:
         * 1.消费端启动时,先从本地或者注册中心拉取 XXX./dubbo.cache文件中取一份provider列表
         * 2.将服务端信息从注册中心或者本地拉取生成相关的远端provider的服务信息,置入directory中
         *
         */
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // Not in the cache, refer again
            try {
                boolean enabled = true;
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {

                    /**
                     * 先使用DubboProtocol的refer方法,这一步会依次调用ProtocolFIlterListenerWrapper,ProtocolFilterWrapper,DubboProtocol中的refer方法。
                     * 经过两个Wrapper中,会添加对应的InvokerListener并构建Invoker Filter链,在DubboProtocol中会创建一个DubboInvoker对象,该Invoker对象持有服务Class,providerUrl,负责和服务提供端通信的ExchangeClient
                     * 接着使用得到的Invoker创建一个InvokerDelegete
                     */
                    invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(key, invoker);
            }
        } else {
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}
```

最终调用到DubboProtocol中的refer了:
image.png
image.png
在进入DubboProtocol的refer方法之前,会依次调用ProtocolFIlterListenerWrapper,ProtocolFilterWrapper。两个wrapper会添加对应的InvokerListener并构建Invoker Filter链,在DubboProtocol中会创建一个DubboInvoker对象,从上面可以看出该Invoker对象持有服务Class,providerUrl,负责和服务端通信的ExchangeClient。然后使用该Invoker创建一个InvokerDelegete并返回

在创建ExchangeClient时有个配置是开头就提到的:延迟加载:
image.png
至于具体的连接也是交给netty去做的,这里不细说,继续看主流程:

三.发起远程调用
image.png

继续


image.png
image.png

继续往下看,前面一系列容错及负载均衡后面一章详细细说


image.png image.png

恩 这里就是真正发起远程调用了,timeout配置也出现了,但此处还不是真正使用timeout的时候,继续往下:


image.png
image.png

到这里很明显是发送一个request出去,具体发送流程如下:
AbstractPeer->AbstractClient->AbstractChannel->NettyChannel->AbstractChannel


image.png

到这里,请求已经通过netty发出去了,那么怎么做到请求结果返回,并且做到超时检测的呢;
其实这里dubbo我认为它的思想是异步转同步的一个思想,也就是发出请求后,服务端异步执行,但消费端会用Future去同步轮训服务端返回结果,并且在轮训中做到超时检查的。此处继续看下代码就清楚了。

上一篇 下一篇

猜你喜欢

热点阅读