Netty-客户端Read实现
1 概述
本文介绍Netty客户端read实现原理(主要介绍NIO通道的Read方法),内容比较简单,因为不涉及Netty内存池等相关知识点,Netty内存池原理后续会有专文介绍,但是本文集已经介绍过了Netty源码-PoolThreadCache、Netty源码-PooledByteBufAllocator静态变量初始化等和Netty内存分配相关内容,可结合着理解本文内容。
2 如何触发Read调用
在NIO编程中,如果应用程序需要和远程交换数据,有读数据的需求,我们一般会将通道注册到Selecotr
上,并注册OP_READ
事件,然后在Selector.select
返回的SelectedKey
中有读事件继续时,会从通道读取数据(关于Selector
事件的说明可看笔者文章NIO SelectionKey事件理解)。Netty作为NIO编程框架也不例外,在NioEventLoop.run
方法中会通过Selector.select
方法等待感兴趣的事件就绪,如果通道有数据到来,即读事件继续,会调用Unsafe.read
方法。本文主要介绍Netty客户端Read方法实现。
3 具体实现
因为我们主要介绍Java NIO实现,所以我们看Netty基于NIO的客户端read实现,其实现源码在NioByteUnsafe.read
方法中:
//NioByteUnsafe
@Override
public final void read() {
//获取config配置
final ChannelConfig config = config();
//判断是否因配置或者异常、错误等中断本次读操作
if (shouldBreakReadReady(config)) {
//这个函数会取消该通道对OP_READ事件的注册
//取消之后即不再响应通道read准备好事件
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
//获取ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
//获取RecvByteBuf分配器,这个笔者有文章介绍过,RecvByteBuf分配器
//主要告诉ByteBuf分配器分配ByteBuf的大小,默认配置的是
//AdaptiveRecvByteBufAllocator
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
//每次read时,重置RecvByteBuf统计的一些信息
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//分配本次读取操作的ByteBuf
byteBuf = allocHandle.allocate(allocator);
//进行实际读取操作,doReadBytes我们后面列出源码
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//如果没有读取到数据,直接返回,结束本次读取
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
//触发Pipeline的channelRead方法,让
//用户定义的handler处理读到的数据
//比如对数据进行解码,然后交给业务层handler处理等
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//判断是否需要继续读
} while (allocHandle.continueReading());
allocHandle.readComplete();
//触发channelReadComplete方法
pipeline.fireChannelReadComplete();
if (close) {
//上面如果最后读取的字节数小于0,则关闭读取操作
//但是不关闭通道
closeOnRead(pipeline);
}
} catch (Throwable t) {
//异常处理
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//看到了熟悉的方法fireExceptionCaught
//这个是handler定义的,可供应用处理异常
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
因为我们这里介绍客户端读取操作,所以我们看NioSocketChannel.doReadBytes
方法
//NioSocketChannel
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
//其实主要是从java nio channel中读取数据
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
上面关于RecvByteBufAllocator
相关内容可参考笔者文章Netty源码-接收缓冲区分配器AdaptiveRecvByteBufAllocator,关于ChannelPipeline
、ChannelHandler
的介绍可参考笔者文章Netty源码-ChannelPipeline和ChannelHandler。