Netty NioSocketChannel读数据源码分析

2020-07-13  本文已影响0人  whateverblake

上一遍文章我们分析了netty NioSocketChannel写数据的过程,接下来我们分析netty NioSocketChannel读数据过程,我们接着上一遍去分析,一端的NioSocketChannel向另一端发送了hello world,那么另一端是如何接受到hello world的呢?

读事件的触发

不论是客户端还是服务端在创建NioSocketChannel实例之后,同样会经历初始化,注册和连接的过程,当连接完成之后NioSocketChannel会向selector注册OP_READ事件,当一端向另一端写数据的时候会触发读事件。NioSocketChannel绑定的NioEventLoop会执行selector.select(),当有读写事件发生的时候,NioEventLoop执行 processSelectedKeys方法来处理这些事件。关于NioEventLoop的解析请参考https://www.jianshu.com/p/732f9dea34d7

processSelectedKeys
//selectedKeys会被netty设置成SelectedSelectionKeySet,所以会执行processSelectedKeysOptimized
private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            //取得每个IO事件对应的SelectionKey
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
           //selectedKeys清空对应的SelectionKey
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();
            //我们基于NIO分析,所以进入processSelectedKey方法
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

processSelectedKey是真正处理IO事件的方法

processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //获取channel的unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
               //如果selectionKey不合法,关闭channel
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }

        try {
            //获取事件类型的编码
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            //如果是OP_CONNECT事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                //OP_CONNECT从感兴趣的事件中清除
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //完成连接
                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                //如果是OP_WRITE事件,那么执行forceFlush,把写缓存刷到网络
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
               //处理读事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

我们分析unsafe.read源代码

unsafe.read()
@Override
        public final void read() {
            final ChannelConfig config = config();
            //shouldBreakReadReady用于判断channel的input是不是关闭了,
            //如果关闭了那么就不应该去读数据了,同时清除OP_READ事件
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            //获取ByteBuffer分配器,这里会涉及到jemalloc的知识,请看我写的另一篇文章[https://www.jianshu.com/p/550704d5a628]
            final ByteBufAllocator allocator = config.getAllocator();
//Handle的默认实现是HandleImpl,它的作用是用来决定每次从channel读取多少数据,每次可以最多读取多少条数据(默认16条)
 //首次默认读取1024个字节,之后根据实际读取到的数据量动态更改每次应该读取的数据量
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    //向netty申请一个ByteByf,这个ByteBuf大小是通过allocHandle计算得到
                    byteBuf = allocHandle.allocate(allocator);
                   //doReadBytes是SocketChannel真正读数据的入口方法
                   //lastBytesRead是记录这次读到的的数据量,如果实际读到的数据量等于byteBuf大小,那么allocHandle会增加下次读取数据用的ByteBuf的大小
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                       //如果读取的数据量是0,那么释放byteBuf
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }
                   //allocHandle更新本次读取到的数据总数
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //触发fireChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                  //allocHandle.continueReading() 会判断需不需要继续从channel中读数据
                 //如果实际读取数据量等于打算读取的数据量,同时读取的数据条数小于单次读取容许的最大条数那么会继续读取
                } while (allocHandle.continueReading());
               //handle根据本次读取的数据总量,动态调整下次读取数据的ByteBuf大小
                allocHandle.readComplete();
             //触发数据读取完成事件
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

我们分析下doReadBytes方法以及内部的调用链

doReadBytes
  protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
         //设置handle试图读取的数据量
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

//ByteBuf.writeBytes解析
   public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
        ensureWritable(length);
       
        int writtenBytes = setBytes(writerIndex, in, length);
        if (writtenBytes > 0) {
          //更新ByteBuf的writerIndex
            writerIndex += writtenBytes;
        }
        return writtenBytes;
    }

//ByteBuf.setBytes解析
 @Override
    public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
        try {
           //internalNioBuffer返回的是ByteBuf对应的ByteBuffer,之后SocketChannel通过read()把数据读取到ByteBuffer中
            return in.read(internalNioBuffer(index, length));
        } catch (ClosedChannelException ignored) {
            return -1;
        }
    }

netty从channel中读取完一批数据后包装成ByteBuf,接着触发channelRead事件,开发者自定义的inboundHandler的channelRead就会被调用,在这个方法中开发者可以根据业务罗处理接受到的这个网络数据


上面就是netty NioSocketChannel从网络上读取数据的过程


接下来我们看下netty是如何动态修改每次读取数据ByteBuf大小,
这个逻辑在allocHandle.readComplete()实现,我们解析readComplete的源代码

readComplete
   public void readComplete() {
            //totalBytesRead()用于获取这次IO读事件中读取到的数据总量
            record(totalBytesRead());
    }

record方法源代码

record
 private void record(int actualReadBytes) {
            //SIZE_TABLE数组记录了一组数,这些数代表着不同大小的ByteBuf,数组元素按照从小到大的顺序存放在SIZE_TABLE中
           //数组中的存的是哪些数据下面会解析 
         //index就是记录每次从channel读取数据ByteBuf的大小在SIZE_TABLE中的索引
          //INDEX_DECREMENT默认是1
           //如果actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)] 说明这次IO事件读取到的数据量小于SIZE_TABLE[index-1]
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                if (decreaseNow) {
                   //下一次读取channel数据的ByteBuf容量缩减为max (SIZE_TABLE[index -1],minIndex)

                    index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                   //当actualReadBytes <[index-1]在首次成立的时候,并不会立马减少下次读取ByteBuf的容量,而只是把decreaseNow设置成true,
                   //说明只有连续两次读取到的实际数据量都小于SIZE_TABLE[index -1]才会使得下次读取数据ByteBuf的容量缩减
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

SIZE_TABLE是一个长度为53的int数组
它存的数据分成两个部分


就分析这么多了,谢谢阅读

上一篇下一篇

猜你喜欢

热点阅读