Netty源码分析——服务端channel的创建

2018-12-02  本文已影响49人  编程易行

工作时候,有用过Netty写过网络库。最近想研究下RPC框架,就想着写几篇博客,梳理下Netty的源码。(研究的源码版本是4.1.x)

最简单的示例

首先还是拿Netty 官网的UserGuide做例子

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}

Netty的源码还是很复杂的,所以研究时候有种无处下手的感觉。但是Netty作为一个NIO的网络框架,其底层肯定是使用了java jdk 的nio编程。所以我们可以通过对比java nio编程 与 Netty源码,来了解Netty究竟对java 的nio做了什么样的封装。

一个java nio编程的例子

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    if(socketChannel != null){
        //do something with socketChannel...
    }
}

例子是网上随便粘的,当然简化了很多很多东西,这篇博客只会分析服务端channel的创建,所以这些暂时也够了。

总结下:
1)创建ServerSocketChannel
2)绑定某个端口
3)设置非阻塞
4)监听新的连接

后面分析源码也会试着找下Netty对应的源码在哪

创建一个NioServerSocketChannel

具体的调用流程如下图

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

     //省略....
}

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
       }
      //省略...
}

ReflectiveChannelFactory

public T newChannel() {
    try {
        return clazz.newInstance();  //这里的clazz就是当时从ServerBootstrap.channel方法里传进去的NioServerSocketChannel
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + clazz, t);
    }
}

代码流程还是很清晰的,总结下就是 Netty会在ServerBootstrap里,通过工厂创建一个ServerChannel

在本例中,工厂是默认的 ReflectiveChannelFactory(通过反射创建Channel),创造出的Channel类型是NioServerSocketChannel

NioServerSocketChannel 的初始化

分析完了NioServerSocketChannel接下来看看NioServerSocketChannel究竟在初始化时候做了啥。

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}   

1、创建一个对应的jdk channel

通过 SelectorProvider.openServerSocketChannel() 创建了一个ServerSocketChannel 这个ServerSocketChannel就是 上文,nio编程的例子里的jdk channel

2、AbstractNioChannel构造方法

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

1)NioServerSocketChannel最终会调用父类AbstractNioChannel的构造函数
2)绑定一个 NioServerSocketChannelConfig

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

1)保存所创建的jdk channel
2)配置configureBlocking 为false

调用父类 AbstractChannel的构造方法

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId(); //绑定一个id
    unsafe = newUnsafe();  //绑定一个NioSocketChannelUnsafe
    pipeline = newChannelPipeline(); //绑定一个DefaultChannelPipeline
}

这一步非常重要:
1)绑定一个 id
2)绑定一个 NioSocketChannelUnsafe
3)绑定一个DefaultChannelPipeline
这几个组件,在Netty中起着非常重要的作用,后面博客里讲到了会详细分析下

总结下,ServerSocketChannel做了这样几件事
1)创建一个jdk channel,并记录下来
2)配置jdk channel 的 configureBlocking 为false
3)绑定一个NioServerSocketChannelConfig
4)绑定id、NioSocketChannelUnsafe、DefaultChannelPipeline 三个组件

初始化ServerChannel

再回到ServerBootStrap里,在创建了ServerChannel(即我们这的NioServerSocketChannel)后,下面一步,是对其进行初始化

@Override
void init(Channel channel) throws Exception {
    //获取我们在client代码里配置的channel
    final Map<ChannelOption<?>, Object> options = options0();
    //最终调用 channel.config().setOption((ChannelOption<Object>) option, value) 即把配置设置到ChannelConfig里
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    //获取client里配置的attrs
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;

    //设置childOptions和childAttrs,最终传到 ServerBootstrapAcceptor里
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();  //client里设置的handler被传进来了
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
            // In this case the initChannel(...) method will only be called after this method returns. Because
            // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
            // placed in front of the ServerBootstrapAcceptor.
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    //currentChildGroup、currentChildHandler、currentChildOptions、currentChildAttrs
                    // 对应childGroup      我们配置的childHandler        配置的childOptions    配置的childAttrs
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

流程还挺长的,总结下:
1)把client端设置的option缓存到NioServerSocketChannelConfig里
2)把client端设置的attr设置到channel里



3)保存配置的childOptions,配置的childAttrs 到ServerBootstrapAcceptor里
4)往NioSocketChannel的pipeline中添加一个ServerBootstrapAcceptor

注册 ServerChannel

ServerChannel的注册流程比较长,下面一篇博客再单独介绍吧。

总结

这篇博客分析了下Netty的服务端启动流程,总结下:
1)Netty会在ServerBootstrap中创建一个ServerChannel
2)ServerChannel底层会绑定一个jdk 的channel
3)ServerChannel创建时,会绑定一个id、NioSocketChannelUnsafe、DefaultChannelPipeline、NioServerSocketChannelConfig
4)初始化ServerChannel时,会把我们自定义配置的option、attr设置进去
5)最终会往 NioSocketChannel 的pipeline里添加一个 ServerBootstrapAcceptor
6)注册ServerChannel(下面一篇博客介绍)

上一篇下一篇

猜你喜欢

热点阅读