dubbo源码解析之server netty启动(四)

2019-03-19  本文已影响0人  binecy

源码分析基于dubbo 2.7.1

前面讲到,RegistryProtocol.doLocalExport会暴露服务

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
        });
    }

这里providerUrl格式为dubbo://(server ip):(server port)/com.dubbo.start.service.HelloService?...(它是由registryUrl registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?&export=dubbo... 转化以来),
所以会调用DubboProtocol,但会先经过装饰类ProtocolListenerWrapper,ProtocolFilterWrapper。

先看看DubboProtocol.export

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        
        String key = serviceKey(url);
        // 创建DubboExporter      
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        ...

        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

注意一下,这里会构建DubboExporter,并缓存到exporterMap。(后面会用到)

关键就是openServer

private void openServer(URL url) {
    String key = url.getAddress();
    // 客户端可以暴露仅供服务器调用的服务
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            server.reset(url);
        }
    }
}

如果ExchangeServer不存在,就createServer

private ExchangeServer createServer(URL url) {
    // url 参数处理
    ...
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    ...

    return server;
}

Exchange

Exchangers.bind(url, requestHandler)这里第二个参数requestHandler是DubboProtocol中的内部类,它是一个关键的类,后面会说到。这里会调用HeaderExchanger.bind

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporter

Transporter接口同样提供方法bind/connect

Transporters根据server/transporter参数,通过ExtensionLoader找到一个Transporter实现类,默认使用netty3。

我配置了netty4,所以会使用com.alibaba.dubbo.remoting.transport.netty4包下的netty代码。
NettyTransporter

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

NettyServer实现了Server接口,该接口提供isBound/getChannels/reset等方法。
NettyServer构造方法

  public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

先看看ChannelHandlers.wrap

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }

Dispatcher

这里出现一个新的概念Dispatcher,它是dubbo中的调度器,默认是AllDispatcher

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

NettyServer继承了AbstractServer,AbstractServer的构造方法会调用NettyServer.doOpen

    protected void doOpen() throws Throwable {
    
        bootstrap = new ServerBootstrap();
        // 构建bossGroup, 线程为1
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));
        // 构建NettyServerHandler
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder())添加编码器和解码器,将字节码转化为dubbo对象。dubbo协议使用的是DubboCountCodec。

.addLast("handler", nettyServerHandler) 添加了nettyServerHandler处理请求。

new NettyServerHandler(getUrl(), this)构建NettyServerHandler时,NettyServe将自身this作为参数,它也实现了ChannelHandler,同时构造NettyServe时也需要传入一个ChannelHandler参数。

ChannelHandler是处理请求的重要接口,提供received/sent等方法。

dubbo使用装饰模式,为ChannelHandler添加了不同的功能:
NettyServe ---> MultiMessageHandler(NettyServer) ---> HeartbeatHandler(NettyServer) ---> AllChannelHandler(AllDispatcher) ---> DecodeHandler(HeaderExchanger) ---> HeaderExchangeHandler(HeaderExchanger) ---> DubboProtocol.requestHandler

上一篇 下一篇

猜你喜欢

热点阅读