2-dubbo源码分析之服务暴露

2018-08-30  本文已影响0人  致虑
image.png
暴露服务的入口自然就清楚了

官网说明:

一.概览

二.容器初始化

   package com.alibaba.dubbo.rpc;
   import com.alibaba.dubbo.common.extension.ExtensionLoader;
   
   public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
       public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
           if (arg1 == null) throw new IllegalArgumentException("url == null");
           com.alibaba.dubbo.common.URL url = arg1;
           String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
           if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
           com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
           return extension.refer(arg0, arg1);
       }
   
       public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
           if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
   
           if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
           //根据URL配置信息获取Protocol协议,默认是dubbo
           String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
           if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
               //根据协议名,获取Protocol的实现
               //获得Protocol的实现过程中,会对Protocol先进行依赖注入,然后进行Wrapper包装,最后返回被修改过的Protocol
               //包装经过了ProtocolFilterWrapper,ProtocolListenerWrapper,RegistryProtocol
           com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
           return extension.export(arg0);
       }
   
       public void destroy() {
           throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
       }
   
       public int getDefaultPort() {
           throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
       }
   }

二.服务暴露

1.暴露原理

按照上面的步骤详细笔录一下

   /***
    * 就比如监听spring容器初始化完成
    * 没有设置延迟或者延迟为-1,dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了ApplicationListener的类进行回调onApplicationEvent,dubbo会在这个方法中发布服务。
    */
   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
       if (isDelay() && !isExported() && !isUnexported()) {
           if (logger.isInfoEnabled()) {
               logger.info("The service ready on spring started. service: " + getInterface());
           }
           export();
       }
   }

显然有一个延迟配置选项,延迟是否配置也会影响这里的入口,没有延迟时的入口就是这个监听,设置了延迟时入口就是afterPropertySet();其实最后的重点都是export();

因为我们显示指明了registry这一配置,跟下代码如何处理:

```
protected List<URL> loadRegistries(boolean provider) {
    checkRegistry();
    List<URL> registryList = new ArrayList<URL>();
    if (registries != null && !registries.isEmpty()) {
        for (RegistryConfig config : registries) {
            String address = config.getAddress();
            if (address == null || address.length() == 0) {
                address = Constants.ANYHOST_VALUE;
            }
            String sysaddress = System.getProperty("dubbo.registry.address");
            if (sysaddress != null && sysaddress.length() > 0) {
                address = sysaddress;
            }
            if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                Map<String, String> map = new HashMap<String, String>();
                appendParameters(map, application);
                appendParameters(map, config);
                map.put("path", RegistryService.class.getName());
                map.put("dubbo", Version.getVersion());
                map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                if (ConfigUtils.getPid() > 0) {
                    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                }
                if (!map.containsKey("protocol")) {
                    if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                        map.put("protocol", "remote");
                    } else {
                        map.put("protocol", "dubbo");
                    }
                }
                List<URL> urls = UrlUtils.parseURLs(address, map);
                for (URL url : urls) {
                    url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                    url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                    if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                            || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}
```
image.png

直接进来就是这样的效果,很显然对于在配置中指定了registry属性的,会在加载spring BeanDefinition的时候就加载了注册中心。
返回的注册中心URL是这样的:


image.png

后面重点来了:doExportUrlsFor1Protocol 这里就区分协议进行服务的暴露了


2.本地暴露

3.远程暴露
    package com.foo;
public class BarServiceStub implements BarService { 
    private final BarService barService;

    // 构造函数传入真正的远程代理对象
    public (BarService barService) {
        this.barService = barService;
    }

    public String sayHello(String name) {
        // 此代码在客户端执行, 你可以在客户端做ThreadLocal本地缓存,或预先验证参数是否合法,等等
        try {
            return barService.sayHello(name);
        } catch (Exception e) {
            // 你可以容错,可以做任何AOP拦截事项
            return "容错数据";
        }
    }
}

还是回到暴露的主体:
- 1.获得Exporter并缓存
- 2.打开socket,供调用方调用,即:根据URL绑定IP与端口,建立NIO框架的Server

看第二点:

 /** 创建NIO Server进行监听 */
 private void openServer(URL url) {
     // find server.
     // key是IP:PORT
     // 192.168.110.197:20880
     String key = url.getAddress();
     // client 也可以暴露一个只有server可以调用的服务
     // client can export a service which's only for server to invoke
     boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
     if (isServer) {
         ExchangeServer server = serverMap.get(key);

         //同一JVM中,同协议的服务,共享同一个Server,
         //第一个暴露服务的时候创建server,
         //以后相同协议的服务都使用同一个server
         if (server == null) {
             serverMap.put(key, createServer(url));
         } else {
             // server支持reset,配合override功能使用
             // server supports reset, use together with override
             //同协议的服务后来暴露服务的则使用第一次创建的同一Server
             //server支持reset,配合override功能使用
             //accept、idleTimeout、threads、heartbeat参数的变化会引起Server的属性发生变化
             //这时需要重新设置Server
             server.reset(url);
         }
     }
 }

划重点:createServer(URL url)

 private ExchangeServer createServer(URL url) {
     // 默认开启server关闭时发送readonly事件
     // send readonly event when server closes, it's enabled by default
     url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
     // 开启默认的heartbeat
     // enable heartbeat by default
     url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
     String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

     if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
         throw new RpcException("Unsupported server type: " + str + ", url: " + url);

     url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
     ExchangeServer server;
     try {
         //Exchangers是门面类,里面封装的是Exchanger的逻辑。
         //Exchanger默认只有一个实现HeaderExchanger.
         //Exchanger负责数据交换和网络通信。
         //从Protocol进入Exchanger,标志着程序进入了remote层。
         //这里requestHandler是ExchangeHandlerAdapter
         // 封装信息转换,Dubbo的Exchanger层
         server = Exchangers.bind(url, requestHandler);
     } catch (RemotingException e) {
         throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
     }
     str = url.getParameter(Constants.CLIENT_KEY);
     if (str != null && str.length() > 0) {
         Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
         if (!supportedTypes.contains(str)) {
             throw new RpcException("Unsupported client type: " + str);
         }
     }
     return server;
 }
 /**
  * 封装请求响应模式,同步转异步
  * getExchanger方法根据url获取到一个默认的实现HeaderExchanger
  * 调用HeaderExchanger的bind方法
  */
 public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
     if (url == null) {
         throw new IllegalArgumentException("url == null");
     }
     if (handler == null) {
         throw new IllegalArgumentException("handler == null");
     }
     url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
     //getExchanger方法根据url获取到一个默认的实现HeaderExchanger
     //调用HeaderExchanger的bind方法
     return getExchanger(url).bind(url, handler);
 }
image.png
image.png
image.png
image.png
image.png
最后进入到这里
 /**
  * doOpen方法创建Netty的Server端并打开,具体的事情就交给Netty去处理了
  */
 @Override
 protected void doOpen() throws Throwable {
     NettyHelper.setNettyLoggerFactory();

     //boss线程池
     ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));

     //worker线程池
     ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));

     //ChannelFactory,没有指定工作者线程数量,就使用cpu+1
     ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
     bootstrap = new ServerBootstrap(channelFactory);

     final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
     channels = nettyHandler.getChannels();
     // https://issues.jboss.org/browse/NETTY-365
     // https://issues.jboss.org/browse/NETTY-379
     // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
     bootstrap.setOption("child.tcpNoDelay", true);
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
         @Override
         public ChannelPipeline getPipeline() {
             NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
             ChannelPipeline pipeline = Channels.pipeline();
             /*int idleTimeout = getIdleTimeout();
             if (idleTimeout > 10000) {
                 pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
             }*/
             pipeline.addLast("decoder", adapter.getDecoder());
             pipeline.addLast("encoder", adapter.getEncoder());
             pipeline.addLast("handler", nettyHandler);
             return pipeline;
         }
     });
     // bind
     channel = bootstrap.bind(getBindAddress());
 }

到这里服务暴露逻辑就完成了,完了就可以接受请求并进行处理了。下面是注册到注册中心了

上一篇 下一篇

猜你喜欢

热点阅读