深入浅出Netty源码剖析

Netty-客户端Read实现

2019-06-05  本文已影响2人  persisting_

1 概述

本文介绍Netty客户端read实现原理(主要介绍NIO通道的Read方法),内容比较简单,因为不涉及Netty内存池等相关知识点,Netty内存池原理后续会有专文介绍,但是本文集已经介绍过了Netty源码-PoolThreadCacheNetty源码-PooledByteBufAllocator静态变量初始化等和Netty内存分配相关内容,可结合着理解本文内容。

2 如何触发Read调用

在NIO编程中,如果应用程序需要和远程交换数据,有读数据的需求,我们一般会将通道注册到Selecotr上,并注册OP_READ事件,然后在Selector.select返回的SelectedKey中有读事件继续时,会从通道读取数据(关于Selector事件的说明可看笔者文章NIO SelectionKey事件理解)。Netty作为NIO编程框架也不例外,在NioEventLoop.run方法中会通过Selector.select方法等待感兴趣的事件就绪,如果通道有数据到来,即读事件继续,会调用Unsafe.read方法。本文主要介绍Netty客户端Read方法实现。

3 具体实现

因为我们主要介绍Java NIO实现,所以我们看Netty基于NIO的客户端read实现,其实现源码在NioByteUnsafe.read方法中:

//NioByteUnsafe
@Override
public final void read() {
    //获取config配置
    final ChannelConfig config = config();
    //判断是否因配置或者异常、错误等中断本次读操作
    if (shouldBreakReadReady(config)) {
        //这个函数会取消该通道对OP_READ事件的注册
        //取消之后即不再响应通道read准备好事件
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    //获取ByteBuf分配器
    final ByteBufAllocator allocator = config.getAllocator();
    //获取RecvByteBuf分配器,这个笔者有文章介绍过,RecvByteBuf分配器
    //主要告诉ByteBuf分配器分配ByteBuf的大小,默认配置的是
    //AdaptiveRecvByteBufAllocator
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    //每次read时,重置RecvByteBuf统计的一些信息
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            //分配本次读取操作的ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            //进行实际读取操作,doReadBytes我们后面列出源码
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            //如果没有读取到数据,直接返回,结束本次读取
            if (allocHandle.lastBytesRead() <= 0) {
                // nothing was read. release the buffer.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            //触发Pipeline的channelRead方法,让
            //用户定义的handler处理读到的数据
            //比如对数据进行解码,然后交给业务层handler处理等
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
            //判断是否需要继续读
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        //触发channelReadComplete方法
        pipeline.fireChannelReadComplete();

        if (close) {
            //上面如果最后读取的字节数小于0,则关闭读取操作
            //但是不关闭通道
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        //异常处理
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
        RecvByteBufAllocator.Handle allocHandle) {
    if (byteBuf != null) {
        if (byteBuf.isReadable()) {
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
        } else {
            byteBuf.release();
        }
    }
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();
    //看到了熟悉的方法fireExceptionCaught
    //这个是handler定义的,可供应用处理异常
    pipeline.fireExceptionCaught(cause);
    if (close || cause instanceof IOException) {
        closeOnRead(pipeline);
    }
}

因为我们这里介绍客户端读取操作,所以我们看NioSocketChannel.doReadBytes方法

//NioSocketChannel
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    //其实主要是从java nio channel中读取数据
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

上面关于RecvByteBufAllocator相关内容可参考笔者文章Netty源码-接收缓冲区分配器AdaptiveRecvByteBufAllocator,关于ChannelPipelineChannelHandler的介绍可参考笔者文章Netty源码-ChannelPipeline和ChannelHandler

上一篇下一篇

猜你喜欢

热点阅读