dubbo

☆聊聊Dubbo(九):核心源码-服务端启动流程2

2018-10-23  本文已影响248人  七寸知架构

3 ServiceConfig#doExportUrlsFor1Protocol 重点分析

3.1 组装URL所需参数

        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);

Step1:用 Map 存储该协议的所有配置参数,包括:协议名称、Dubbo版本、当前系统时间戳、进程ID、application配置、module配置、默认服务提供者参数(ProviderConfig)、协议配置、服务提供 Dubbo:service 的属性。

        if (methods != null && !methods.isEmpty()) {
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (arguments != null && !arguments.isEmpty()) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods != null && methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }

Step2:如果 dubbo:servicedubbo:method 子标签,则 dubbo:method 以及其子标签的配置属性,都存入到 Map 中,属性名称加上对应的方法名作为前缀。dubbo:method 的子标签 dubbo:argument,其键为方法名.参数序号。

        if (ProtocolUtils.isGeneric(generic)) {
            map.put(Constants.GENERIC_KEY, generic);
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
                map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }

Step3:添加 methods 键值对,存放 dubbo:service 的所有方法名,多个方法名用 , 隔开,如果是泛化实现,填充 genric=true,methods“*”

        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(Constants.TOKEN_KEY, token);
            }
        }

Step4:根据是否开启令牌机制,如果开启,设置 token 键,值为静态值或 uuid

        if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }

Step5:如果协议为本地协议( injvm ),则设置 protocolConfig#register 属性为 false ,表示不向注册中心注册服务,在 map 中存储键为 notify,值为 false,表示当注册中心监听到服务提供者发生变化(服务提供者增加、服务提供者减少等)事件时不通知。

        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }

Step6:设置协议的 contextPath,如果未配置,默认为 /interfacename

        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);

Step7:解析服务提供者的IP地址与端口。

    private String findConfigedHosts(ProtocolConfig protocolConfig, List<URL> registryURLs, Map<String, String> map) {
        boolean anyhost = false;

        String hostToBind = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_BIND);
        if (hostToBind != null && hostToBind.length() > 0 && isInvalidLocalHost(hostToBind)) {
            throw new IllegalArgumentException("Specified invalid bind ip from property:" + Constants.DUBBO_IP_TO_BIND + ", value:" + hostToBind);
        }

        // if bind ip is not found in environment, keep looking up
        if (hostToBind == null || hostToBind.length() == 0) {
            hostToBind = protocolConfig.getHost();
            if (provider != null && (hostToBind == null || hostToBind.length() == 0)) {
                hostToBind = provider.getHost();
            }
            if (isInvalidLocalHost(hostToBind)) {
                anyhost = true;
                try {
                    hostToBind = InetAddress.getLocalHost().getHostAddress();
                } catch (UnknownHostException e) {
                    logger.warn(e.getMessage(), e);
                }
                if (isInvalidLocalHost(hostToBind)) {
                    if (registryURLs != null && !registryURLs.isEmpty()) {
                        for (URL registryURL : registryURLs) {
                            if (Constants.MULTICAST.equalsIgnoreCase(registryURL.getParameter("registry"))) {
                                // skip multicast registry since we cannot connect to it via Socket
                                continue;
                            }
                            try {
                                Socket socket = new Socket();
                                try {
                                    SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
                                    socket.connect(addr, 1000);
                                    hostToBind = socket.getLocalAddress().getHostAddress();
                                    break;
                                } finally {
                                    try {
                                        socket.close();
                                    } catch (Throwable e) {
                                    }
                                }
                            } catch (Exception e) {
                                logger.warn(e.getMessage(), e);
                            }
                        }
                    }
                    if (isInvalidLocalHost(hostToBind)) {
                        hostToBind = getLocalHost();
                    }
                }
            }
        }

        map.put(Constants.BIND_IP_KEY, hostToBind);

        // registry ip is not used for bind ip by default
        String hostToRegistry = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry != null && hostToRegistry.length() > 0 && isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        } else if (hostToRegistry == null || hostToRegistry.length() == 0) {
            // bind ip is used as registry ip by default
            hostToRegistry = hostToBind;
        }

        map.put(Constants.ANYHOST_KEY, String.valueOf(anyhost));

        return hostToRegistry;
    }

服务IP地址解析顺序:(序号越小越优先)

  1. 系统环境变量,变量名:DUBBO_DUBBO_IP_TO_BIND
  2. 系统属性,变量名:DUBBO_DUBBO_IP_TO_BIND
  3. 系统环境变量,变量名:DUBBO_IP_TO_BIND
  4. 系统属性,变量名:DUBBO_IP_TO_BIND
  5. dubbo:protocol 标签的 host 属性 --> dubbo:provider 标签的 host 属性
  6. 默认网卡IP地址,通过 InetAddress.getLocalHost().getHostAddress() 获取,如果IP地址不符合要求,继续下一个匹配。
   // 判断IP地址是否符合要求的标准
   public static boolean isInvalidLocalHost(String host) {
       return host == null
               || host.length() == 0
               || host.equalsIgnoreCase("localhost")
               || host.equals("0.0.0.0")
               || (LOCAL_IP_PATTERN.matcher(host).matches());
   }
  1. 选择第一个可用网卡,其实现方式是建立 socket,连接注册中心,获取 socket 的IP地址
      Socket socket = new Socket();
       try {
             SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
             socket.connect(addr, 1000);
             hostToBind = socket.getLocalAddress().getHostAddress();
             break;
        } finally {
             try {
                     socket.close();
             } catch (Throwable e) {
             }
       }
    private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map<String, String> map) {
        Integer portToBind = null;

        // parse bind port from environment
        String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND);
        portToBind = parsePort(port);

        // if there's no bind port found from environment, keep looking up.
        if (portToBind == null) {
            portToBind = protocolConfig.getPort();
            if (provider != null && (portToBind == null || portToBind == 0)) {
                portToBind = provider.getPort();
            }
            final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
            if (portToBind == null || portToBind == 0) {
                portToBind = defaultPort;
            }
            if (portToBind == null || portToBind <= 0) {
                portToBind = getRandomPort(name);
                if (portToBind == null || portToBind < 0) {
                    portToBind = getAvailablePort(defaultPort);
                    putRandomPort(name, portToBind);
                }
                logger.warn("Use random available port(" + portToBind + ") for protocol " + name);
            }
        }

        // save bind port, used as url's key later
        map.put(Constants.BIND_PORT_KEY, String.valueOf(portToBind));

        // registry port, not used as bind port by default
        String portToRegistryStr = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_REGISTRY);
        Integer portToRegistry = parsePort(portToRegistryStr);
        if (portToRegistry == null) {
            portToRegistry = portToBind;
        }

        return portToRegistry;
    }

服务提供者端口解析顺序:(序号越小越优先)

  1. 系统环境变量,变量名:DUBBO_DUBBO_PORT_TO_BIND
  2. 系统属性,变量名:DUBBO_DUBBO_PORT_TO_BIND
  3. 系统环境变量,变量名:DUBBO_PORT_TO_BIND
  4. 系统属性,变量名:DUBBO_PORT_TO_BIND
  5. dubbo:protocol 标签 port 属性 --> dubbo:provider 标签的 port 属性。
  6. 随机选择一个端口。

3.2 封装URL实例

        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

Step8:根据协议名称、协议 host、协议端口、contextPath、相关配置属性(applicationmoduleproviderprotocolConfigservice 及其子标签)构建服务提供者URI。

URL运行效果图,如下:

URL运行效果图

3.3 构建Invoker实例

        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { // @ 代码1

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { // @ 代码2
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) { // @ 代码3
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // @ 代码4
                        URL monitorUrl = loadMonitor(registryURL); // @ 代码5
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @ 代码6
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker); // 代码7
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }

Step9:获取 dubbo:service 标签的 scope 属性,其可选值为 none (不暴露)、local (本地)、remote (远程),如果配置为 none,则不暴露。默认为 local

Step10:根据 scope 来暴露服务,如果 scope 不配置,则默认本地与远程都会暴露,如果配置成 localremote,那就只能是二选一。

代码1:如果 scope 不为 remote,则先在本地暴露( injvm ),具体暴露服务的具体实现,将在remote 模式中详细分析。

代码2:如果 scope 不为 local,则将服务暴露在远程。

代码3remote 方式,检测当前配置的所有注册中心,如果注册中心不为空,则遍历注册中心,将服务依次在不同的注册中心进行注册。

代码4:如果 dubbo:servicedynamic 属性未配置, 尝试取 dubbo:registrydynamic 属性,该属性的作用是否启用动态注册,如果设置为 false,服务注册后,其状态显示为 disable,需要人工启用,当服务不可用时,也不会自动移除,同样需要人工处理,此属性不要在生产环境上配置。

代码5:根据注册中心URL,构建监控中心的URL,如果监控中心URL不为空,则在服务提供者URL上追加 monitor,其值为监控中心URL(已编码)。

1)如果dubbo spring xml配置文件中没有配置监控中心(dubbo:monitor),就从系统属性-Ddubbo.monitor.address,-Ddubbo.monitor.protocol构建MonitorConfig对象,否则从dubbo的properties配置文件中寻找这个两个参数,如果没有配置,则返回null。 
2)如果有配置,则追加相关参数,dubbo:monitor标签只有两个属性:address、protocol,其次会追加interface(MonitorService)、协议等。

代码6:通过动态代理机制创建 Invoker,Dubbo的远程调用实现类。

通过动态代理机制创建Invoker

Dubbo远程调用器如何构建,这里不详细深入,重点关注WrapperInvoker的url为:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider
&dubbo=2.0.0
&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6328%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527255510215
&pid=6328
&qos.port=22222
&registry=zookeeper
&timestamp=1527255510202

这里有两个重点值得关注:

  1. path属性com.alibaba.dubbo.registry.RegistryService,注册中心也类似于服务提供者。
  2. export属性:值为服务提供者的URL,为什么需要关注这个URL呢?请看代码7,protocol 属性为 Protocol$Adaptive,Dubbo在加载组件实现类时采用SPI(关于SPI细节,可参阅《☆聊聊Dubbo(五):核心源码-SPI扩展》
    ),在这里我们只需要知道,根据URL冒号之前的协议名将会调用相应的方法。
Protocol 适配实现类实例

其映射关系(列出与服务启动相关协议实现类):

dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol // 文件位于dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol // 文件位于dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 

代码7:根据代码6的分析,将调用 RegistryProtocol#export 方法。

这里很重要的是 Invoker 实例,作为Dubbo的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现

所以,下面重点分析 代码6 & 代码7 两处代码实现,源码如下:

// 使用ProxyFactory将服务实现封装成一个Invoker对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @ 代码6

// 根据指定协议本地暴露和向注册中心注册服务
Exporter<?> exporter = protocol.export(invoker); // @ 代码7

//用于unexport
exporters.add(exporter);

上面 proxyFactoryprotocol 两个变量,具体定义如下:

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();  
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 

Invoker 实例从 proxyFactory 获取,而 proxyFactory 在这里实际是个适配器,通过调用 getAdaptiveExtension() 方法,会以拼接源码的方式动态生成目标ProxyFactory Class,生成的Class方法中会获取 url 中的参数来构建合适的具体实现对象,如果 url 中未配置,则使用 @SPI 配置的默认值。

查看 ProxyFactoryProtocol 接口,默认 ProxyFactory 实现为 JavassistProxyFactory,默认 Protocol 实现为 DubboProtocol。源码如下:

// 默认javassist
@SPI("javassist")
public interface ProxyFactory {  
    ...
}

// 默认dubbo
@SPI("dubbo")
public interface Protocol {  
    ...
}

ExtensionLoader#getAdaptiveExtension() 调用栈,如下:

ExtensionLoader<T>.getAdaptiveExtension()  
    ExtensionLoader<T>.createAdaptiveExtension()
        ExtensionLoader<T>.getAdaptiveExtensionClass()
            ExtensionLoader<T>.createAdaptiveExtensionClass()
                ExtensionLoader<T>.createAdaptiveExtensionClassCode()

最终,生成目标ProxyFactory Class,源码如下:

public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
        if (arg2 == null) 
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

可以看到,在上面的 getInvoker 方法中,会优先获取 proxy 扩展,否则默认获取 javassist 扩展。一般情况下,我们未主动扩展配置代理工厂的话,使用 JavassistProxyFactory,源码如下:

public class JavassistProxyFactory extends AbstractProxyFactory {
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

注意到这里的入参包括 proxy 服务实例和其接口类型,因为需要对服务进行代理封装,最终是生成一个 AbstractProxyInvoker 实例,其 doInvoker 方法成为服务调用的入口。以下是具体的封装过程:

public static Wrapper getWrapper(Class<?> c) {
    while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
        c = c.getSuperclass();
    if( c == Object.class )
        return OBJECT_WRAPPER;
    Wrapper ret = WRAPPER_MAP.get(c);
    if( ret == null )
    {
        ret = makeWrapper(c);
        WRAPPER_MAP.put(c,ret);
    }
    return ret;
}

具体的 makeWrapper 方法是利用 javassist 技术动态构造 Wapper 类型并创建实例,源码较长这里不再列出,以下是 Wapper 类型的 invokeMethod 方法源码(注意是 javasssit 语法形式):

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { 
    indi.cesc.inno.learn.dubbo.HelloService w; 
    try{ 
        w = ((indi.cesc.inno.learn.dubbo.HelloService)$1); 
    }catch(Throwable e){ 
        throw new IllegalArgumentException(e); 
    } 
    try{ 
        if( "sayHello".equals( $2 )  &&  $3.length == 1 ) {  
            return ($w)w.sayHello((indi.cesc.inno.learn.dubbo.HelloRequest)$4[0]);  // 真实方法调用
        } 
    } catch(Throwable e) {      
        throw new java.lang.reflect.InvocationTargetException(e);  
    } 
    throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class indi.cesc.inno.learn.dubbo.HelloService."); 
}

可以看到 w.sayHello() 这就是直接通过服务的实现对象调用具体方法,并不是通过反射,效率会高些。默认使用Javassist而不是JDK动态代理也是出于效率的考虑

这里就将真实服务加入到整体调用链条之中,后续再将 Invoker 往上层传递,打通整个链条。

继续上面 代码7 处的代码,protocol 实例调用 export 方法进入后续流程。这里的 protocol 类型实际依旧是个适配器,export 方法源码如下:

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
    if (arg0 == null) 
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) 
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
    com.alibaba.dubbo.common.URL url = arg0.getUrl();
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) 
        throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
}

注意 invokerurl 不是服务暴露的 url,而是协议注册的 url,因此 url 里面的协议是 registry。尝试获取名为 registryProtocol 扩展,但进入 ExtensionLoader 后被拦截,实际拿到了其封装类 ProtocolFilterWrapper,其负责组装过滤器链

/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 中配置有:

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper  
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper  

则获取 RegistryProtocol 实例会被 ProtocolFilterWrapperProtocolListenerWrapper 装饰,分别用来实现拦截器和监听器功能,查看这两个Wrapper的代码可以看出,对于注册url都做了特别处理,向注册中心发布url不会触发拦截器和监听器功能,只有在真正暴露服务时才会注册拦截器,触发监听器

ProtocolFilterWrapper#export 方法,源码如下:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        // 此处,将直接进入 RegistryProtocol 的 export 方法
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

此处,将直接进入 RegistryProtocolexport 方法。

3.4 注册发布服务

依据上面分析,最终注册发布服务调用链:ServiceBean#afterPropertiesSet —> ServiceConfig#export —> ServiceConfig#doExport —> ServiceConfig#doExportUrlsFor1Protocol —> RegistryProtocol#export

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // @ 代码1

        URL registryUrl = getRegistryUrl(originInvoker); // @ 代码2

        //registry provider
        final Registry registry = getRegistry(originInvoker); // @ 代码3
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);  // @ 代码4 start

        //to judge to delay publish whether or not
        boolean register = registeredProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

        if (register) {
            register(registryUrl, registeredProviderUrl); // @ 代码4 end
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // @ 代码5 start
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // @ 代码5 end
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }

代码1:启动服务提供者服务,监听指定端口,准备服务消费者的请求,这里其实就是从 WrapperInvoker 中的 url (注册中心 url )中提取 export 属性,描述服务提供者的 url,然后启动服务提供者。

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
调用DubboProtocol#export完成Dubbo服务的启动

从上图中,可以看出,将调用 DubboProtocol#export 完成Dubbo服务的启动,利用netty构建一个微型服务端,监听端口,准备接受服务消费者的网络请求,然后将 dubbo:service 的服务handler加入到命令处理器中,当有消息消费者连接该端口时,通过网络解包,将需要调用的服务和参数等信息解析处理后,转交给对应的服务实现类处理即可

代码2:获取真实注册中心的URL,例如:zookeeper注册中心的URL。

zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider
&dubbo=2.0.0
&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D10252%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527263060882
&pid=10252
&qos.port=22222
&timestamp=1527263060867

代码3:根据注册中心URL,从注册中心工厂中获取指定的注册中心实现类:zookeeper注册中心的实现类为:ZookeeperRegistry

代码4:获取服务提供者URL中的 register 属性,如果为 true,则调用注册中心的 ZookeeperRegistry#register 方法向注册中心注册服务(实际由其父类 FailbackRegistry 实现)。

RegistryProtocol#register 方法,源码如下:

    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

FailbackRegistry#register 方法,源码如下:

    @Override
    public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            failedRegistered.add(url);
        }
    }

ZookeeperRegistry#doRegister 方法,源码如下:

    @Override
    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

代码5:服务提供者向注册中心订阅自己,主要是为了服务提供者URL发生变化的时候,会触发 overrideSubscribeListenernotify 方法重新暴露服务。当然,会将 dubbo:referencecheck 属性设置为 false

为了感知注册中心的一些配置变化,提供者会监听注册中心路径 /dubbo/${interfaceClass}/configurators 的节点,监听该节点在注册中心的一些配置信息变更。Zookeeper注册中心通过zookeeper框架的监听回调接口进行监听(redis注册中心通过订阅命令(subscribe)监听),服务器缓存注册中心的配置,当配置发生变更时,服务会刷新本地缓存。

FailbackRegistry#subscribe 订阅方法,源码如下:

    @Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && !urls.isEmpty()) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }

ZookeeperRegistry#doSubscribe 订阅方法,源码如下:

    @Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
                                child = URL.decode(child);
                                if (!anyServices.contains(child)) {
                                    anyServices.add(child);
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList<URL>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

3.5 打通服务网络

本节是切实最最核心的,重点关注 RegistryProtocol#export 中调用 doLocalExport 方法,其实主要是 根据各自协议,服务提供者建立网络服务器,在特定端口建立监听,监听来自消息消费端服务的请求

RegistryProtocol#doLocalExport,源码如下:

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // @ 代码1
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // @ 代码2
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

代码1:如果服务提供者以 dubbo 协议暴露服务,getProviderUrl(originInvoker)返回的URL将以 dubbo:// 开头。

代码2:根据Dubbo内置的SPI机制,将调用 DubboProtocol#export方法。

DubboProtocol#export,源码如下:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl(); // @ 代码1

        // export service.
        String key = serviceKey(url); // @ 代码2
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); // @ 代码3 start
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        } // @ 代码3 end

        openServer(url); // @ 代码4
        optimizeSerialization(url); // @ 代码5
        return exporter;
    }

代码1:获取服务提供者URL,以协议名称,这里是 dubbo:// 开头。

代码2:从服务提供者URL中获取服务名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880

代码3:是否将转发事件导出成 stub

代码4:根据url打开服务。

代码5:根据url优化器序列化方式。

DubboProtocol#openServer,源码如下:

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress(); // @ 代码1
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key); // @ 代码2
                    if (server == null) {
                        serverMap.put(key, createServer(url)); // @ 代码3
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url); // @代码4
            }
        }
    }

代码1:根据url获取网络地址:ip:port,例如:192.168.56.1:20880,服务提供者IP与暴露服务端口号。

代码2:根据key从服务器缓存中获取,如果存在,则执行代码4,如果不存在,则执行代码3.

代码3:根据URL创建一服务器,Dubbo服务提供者服务器实现类为 ExchangeServer

代码4:如果服务器已经存在,用当前URL重置服务器,这个不难理解,因为一个Dubbo服务中,会存在多个 dubbo:service 标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务

DubboProtocol#createServer,源码如下:

    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // @ 代码1
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @ 代码2
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @ 代码3

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) // @ 代码4
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // @ 代码5
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler); // @ 代码6
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY); // @ 代码7
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

代码1:为服务提供者url增加 channel.readonly.sent 属性,默认为 true,表示在发送请求时,是否等待将字节写入socket后再返回,默认为 true

代码2:为服务提供者url增加 heartbeat 属性,表示心跳间隔时间,默认为 60*1000,表示60s。

代码3:为服务提供者url增加 server 属性,可选值为 netty,mina 等等,默认为 netty

代码4:根据SPI机制,判断 server 属性是否支持。

代码5:为服务提供者url增加 codec 属性,默认值为 dubbo,协议编码方式。

代码6:根据服务提供者URI,服务提供者命令请求处理器 requestHandler 构建 ExchangeServer 实例。requestHandler 的实现具体在以后详细分析Dubbo服务调用时再详细分析。

代码7:验证客户端类型是否可用。

Exchangers#bind方法,根据 URLExchangeHandler 构建服务器,源码如下:

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

上述代码不难看出,首先根据 url 获取 Exchanger 实例,然后调用 bind 方法构建 ExchangeServerExchanger 接口方法如下:

Exchanger接口方法
  1. ExchangeServer bind(URL url, ExchangeHandler handler):服务提供者调用。
  2. ExchangeClient connect(URL url, ExchangeHandler handler):服务消费者调用。

Dubbo提供的实现类为:HeaderExchanger,其 bind 方法如下:

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

从此处可以看到,端口的绑定由 Transportersbind 方法实现。源码如下:

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

从这里得知,Dubbo网络传输的接口有 Transporter 接口实现,其继承类图所示:

Transporter继承类图

本文以netty版本来查看一下 Transporter 实现。 NettyTransporter 源码如下:

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty3";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

创建 NettyServer 实例时,其父类构造函数会调用 doOpen() 建立网络连接,源码如下:

    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // @ 代码1
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler); // @ 代码2
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

从本方法 代码1 & 代码2 了解,首先创建 NettyServer 必须传入一个服务提供者 URL,但从 DubboProtocol#createServer 中可以看出,Server是基于网络套接字 (ip:port) 缓存的,一个JVM应用中,必然会存在多个 dubbo:service 标签,就会有多个 URL这里为什么可以这样做呢?

DubboProtocol#createServer 中可以看出,在解析第二个 dubbo:service 标签时并不会调用 createServer,而是会调用 Server#reset 方法,是不是这个方法有什么魔法,在reset方法时能将URL也注册到Server上

那接下来分析 NettyServer#reset 方法是如何实现的?DubboProtocol#reset 方法最终将调用 Serverreset 方法,同样还是以netty版本的 NettyServer 为例,查看reset方法的实现原理。 NettyServer#reset—>父类(AbstractServer) AbstractServer#reset,源码如下:

    @Override
    public void reset(URL url) {
        if (url == null) {
            return;
        }
        try {
            if (url.hasParameter(Constants.ACCEPTS_KEY)) {
                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
                if (a > 0) {
                    this.accepts = a;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
                if (t > 0) {
                    this.idleTimeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.THREADS_KEY)
                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { // @ 代码1 start
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                int threads = url.getParameter(Constants.THREADS_KEY, 0);
                int max = threadPoolExecutor.getMaximumPoolSize();
                int core = threadPoolExecutor.getCorePoolSize();
                if (threads > 0 && (threads != max || threads != core)) {
                    if (threads < core) {
                        threadPoolExecutor.setCorePoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setMaximumPoolSize(threads);
                        }
                    } else {
                        threadPoolExecutor.setMaximumPoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setCorePoolSize(threads);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        } // @ 代码1 end
        super.setUrl(getUrl().addParameters(url.getParameters())); // @ 代码2 
    }

代码1:首先是调整线程池的相关线程数量,这个好理解。

代码2:然后设置调用 setUrl 覆盖原先 NettyServerprivate volatile URL url 的属性,那为什么不会影响原先注册的 dubbo:service 呢?原来 NettyHandler 上加了注解: @Sharable,由该注解去实现线程安全。

扫码入群讨论

Dubbo Geek-1群
上一篇下一篇

猜你喜欢

热点阅读