Netty4源码深入学习1-服务启动

2018-04-26  本文已影响51人  未名枯草

1.源码准备

试用版本为


引用包版本

Netty 服务端创建的时序图,如下:


服务端创建的时序图

2.服务端启动代码说明:

举例:服务端启动代码:


服务端启动代码

开启一个服务端,端口绑定在8888,使用nio模式,下面讲下每一个步骤的处理细节

1)EventLoopGroup
 是一个死循环,不停地检测IO事件,处理IO事件,执行任务,后面详细描述;
 初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池";
2)ServerBootstrap
初始化ServerBootstrap实例, 此实例是netty服务端应用开发的入口是服务端的一个启动辅助类;
通过给它设置一系列参数来绑定端口启动服务;
3).channel(NioServerSocketChannel.class)
指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel;
表示服务端启动的是nio相关的channel;
channel在netty里面是一大核心概念,可以理解为一条channel就是一个连接或者一个服务端bind动作;
4)b.childHandler(new NettyServerFilter())
 表示一条新的连接进来之后,该怎么处理,

NettyServerFilter代码如图中所示:


NettyServerFilter代码
5)ChannelFuturef =b.bind(port).sync()
这里就是真正的启动过程了,绑定6789端口,等待服务器启动完毕,才会进入下行代码。

3. 详细描述

跳入bind()方法:
bind()方法
通过端口号创建一个 InetSocketAddress,然后继续step,进入下一步:
image.png
其中validate()方法用于验证服务启动需要的必要参数是否合格
ivalidate()
跳入上一层校验,校验内容如下,group及channelFactory是否为空;
image.png
完成校验以后进行doBind()方法:
doBind()
dobind()内部实现,主要有两个核心内容。两大核心一个是 initAndRegister(),以及doBind0();
a)首先看initAndRegister() 方法:
initAndRegister() 方法
核心代码如图中箭头部分;
1. new 一个channel,
2. init这个channel,即调用init(channel)初始化通道信息
3. 将这个channel register到某个对象。
<1> new 一个channel,step进入
channel = channelFactory().newChannel();此处相比以前版本去掉final并放入try-catch中
调用channelFactory生成通道channel实例,
NioServerSocketChannel 作为clazz,是通过serverbootstrap的channel方法来指定通道类型。
进一步查看channleFactory的初始化,

此时回到NettyServer, Netty实现初始化AbstractBootstrap的位置,此时,在代码中存在

 b.channel(NioServerSocketChannel.class); // 设置nio类型的channel

查找此方法,发现在AbstractBootstrap类中存在channel方法,在方法channel中初始化new一个factory:

channelFactory初始化
ReflectiveChannelFactory继承了ChannelFactory工厂方法newChannel
所以,initAndRegister中的channelFactory.newChannel()方法就是生成了一个NioServerSocketChannel的实例。
image.png
  clazz.newInstance() 是通过反射的方式来创建一个对象,
 而这个clazz就是我们在ServerBootstrap中传入的NioServerSocketChannel.class

进一步step,会初始化一系列变量,最终调用反射的clazz类,即NioServerSocketChannel,进行初始化,


NioServerSocketChannel初始化
  private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
  //DEFAULT_SELECTOR_PROVIDER 为一个selector,

逐渐step,会跳入到NioServerSocketChannel构造方法中,


NioServerSocketChannel构造方法
[NIO SelectionKey中定义的4种事件]
  *   SelectionKey.OP_ACCEPT —— 接收连接继续事件,**表示服务器监听到了客户连接,服务器可以接收这个连接了**

  *   SelectionKey.OP_CONNECT —— 连接就绪事件,**表示客户与服务器的连接已经建立成功**

  *   SelectionKey.OP_READ —— 读**就绪**事件,**表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)**

  *   SelectionKey.OP_WRITE —— 写**就绪**事件,**表示已经可以向通道写数据了(通道目前可以用于写操作)**

   这里 注意,下面两种,SelectionKey.OP_READ ,SelectionKey.OP_WRITE ,

  1.当向通道中注册SelectionKey.OP_READ事件后,如果客户端有向缓存中write数据,下次轮询时,则会 isReadable()=true;

  2.当向通道中注册SelectionKey.OP_WRITE事件后,这时你会发现当前轮询线程中isWritable()一直为ture,如果不设置为其他事件

进一步往构造方法super上层查看,最后跳入:


image.png
  将前面 provider.openServerSocketChannel(); 创建出来的 ServerSocketChannel保存到成员变量ch
  然后调用ch.configureBlocking(false);设置该channel为非阻塞模式
  这里的 readInterestOp 即前面层层传入的 SelectionKey.OP_ACCEPT,

接下来重点分析 super(parent);(这里的parent其实是null,由前面写死传入);
在AbstractNioChannel中做了下面几件事:

  1、继续调用父类AbstractChannel(Channel parent)构造方法;
  此构造方法中,主要做了三件事:
  1)、给channel生成一个新的id
  2)、通过newUnsafe初始化channel的unsafe属性
  3)、pipeline =new DefaultChannelPipeline(this) 初始化channel的pipeline属性
image.png
  2)在AbstractChannel类中,newUnsafe()是一个抽象方法
image.png

最终实现来自于AbstractNioMessageChannel类中有newUnsafe()的实现


image.png
       此方法返回一个NioMessageUnsafe实例对象,
       而NioMessageUnsafe是AbstractNioMessageChannel的内部类
       NioMessageUnsafe 只覆盖了 父类AbstractNioUnsafe中的read方法,如下图
       通过NioMessageUnsafe 及其父类的代码便可以知道,
       其实unsafe对象是真正的负责底层channel的连接/读/写等操作的,
       unsafe就好比一个底层channel操作的代理对象
NioMessageUnsafe
  OP_ACCEPT都已经注册上了,当接收到新用户连接时就会触发unsafe.read()方法。
 read()会不断调用doReadMessages(),将产生的readBuf逐一发送给Pipeline.fireChannelRead()去处理。

    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

unsafe内容后续学习;

      3)  pipeline =new DefaultChannelPipeline(this);
      # step之后发现,最后实现如下图代码所示:
      # 初始化了HeadContext及TailContext对象。
      # head及tail初始化完成后,它们会相互连接。
      # 通过上面的代码可以得出,pipeline就是一个双向链表。
DefaultChannelPipeline构造函数
<1>部分总结:
用户调用方法 Bootstrap.bind(port) 第一步就是通过反射的方式new一个NioServerSocketChannel对象,
并且在new的过程中创建了一系列的核心组件,进一步研究:

1、NioServerSocketChannel对象内部绑定了Java NIO创建的ServerSocketChannel对象;

2、Netty中,每个channel都有一个unsafe对象,此对象封装了Java NIO底层channel的操作细节;

3、Netty中,每个channel都有一个pipeline对象,此对象就是一个双向链表;
NioServerSocketChannel的类继承结构图:
NioServerSocketChannel的类继承结构图
<2> init这个channel
image.png
上面代码前几行主要进行配置,
重要内容为最后一行
获取当前通道的pipeline,然后为 NioServerSocketChanne l绑定的 pipeline 添加 Handler;
此pipeline即为上面<2>中生成的pipeline
上一篇下一篇

猜你喜欢

热点阅读