Netty 源码解析 ——— writeAndFlush流程分析
本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。
源码解析
本文主要对Netty的写数据流程进行分析。代码调用仅一句:
ctx.writeAndFlush("from server : " + UUID.randomUUID());
变量 ctx 指的是 ChannelHandlerContext对象,我们跟进ChannelHandlerContext的writeAndFlush方法:
public ChannelFuture writeAndFlush(Object msg) {
return write AndFlush(msg, newPromise());
}
因为写是异步操作,所以如果我们没有自定义一个ChannelPromise的话,就会构建一个默认的ChannelPromise(即,DefaultChannelPromise)来表示该异步操作。我们可以通过往ChannelPromise中注册listener来得到该异步操作的结果(成功 or 失败),listener会在异步操作完成后得到回调。
往下跟,我们会到👇流程
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
这里会完成两个重要的步骤:
① invokeWrite0(msg, promise);将消息放入输出缓冲区中(ChannelOutboundBuffer)
② invokeFlush0(); 将输出缓冲区中的数据通过socket发送到网络中
下面我们来详细展开这两步骤
invokeWrite0(msg, promise)
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
write是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
unsafe是我们构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。NioServerSocketChannel中持有的unsafe成员变量是NioMessageUnsafe对象,而NioSocketChannel中持有的unsafe成员变量是NioSocketChannelUnsafe对象。这里我们要看的是NioSocketChannel的write流程:
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
① 获取该NioSocketChannel的ChannelOutboundBuffer成员属性。(确切地来说ChannelOutboundBuffer是NioSocketChannelUnsafe对象中的成员属性,而NioSocketChannelUnsafe才是NioSocketChannel的成员属性。)每一个NioSocketChannel会维护一个它们自己的ChannelOutboundBuffer,用于存储待出站写请求。
判断该outboundBuffer是否为null,如果为null则说明该NioSocketChannel已经关闭了,那么就会标志该异步写操作为失败完成,并释放写消息后返回。
② 『msg = filterOutboundMessage(msg);』过滤待发送的消息:
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
过滤待发送的消息,只有ByteBuf(堆 or 非堆)以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。
『size = pipeline.estimatorHandle().size(msg);』估计待发送数据的大小:
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
if (msg instanceof FileRegion) {
return 0;
}
return unknownSize;
}
估计待发送消息数据的大小,如果是FileRegion的话直接饭0,否则返回ByteBuf中可读取字节数。
③ 『outboundBuffer.addMessage(msg, size, promise)』将消息加入outboundBuffer中等待发送。
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
首先对ChannelOutboundBuffer、Entry做个简单介绍
ChannelOutboundBuffer
一个内部的数据结构,被AbstractChannel用于存储它的待出站写请求。
ChannelOutboundBuffer中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。以及一个private Entry tailEntry;对象表示始终指向最后一个Entry对象(即,最后加入到该ChannelOutboundBuffer中的写请求的数据消息)
unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。
Entry
Entry是ChannelOutboundBunffer的一个内部类,它是对真实的写消息数据以及其相关信息的一个封装。大致封装了如下信息:
a) pendingSize:记录有该ByteBuf or ByteBufs 中待发送数据大小 和 对象本身内存大小 的累加和;
b) promise:该异步写操作的ChannelPromise(用于在完成真是的网络层write后去标识异步操作的完成以及回调已经注册到该promise上的listeners);
c) total:待发送数据包的总大小(该属性与pendingSize的区别在于,如果是待发送的是FileRegion数据对象,则pengdingSize中只有对象内存的大小,即真实的数据大小被记录为0;但total属性则是会记录FileRegion中数据大小,并且total属性是不包含对象内存大小,仅仅是对数据本身大小的记录);
e) msg:原始消息对象的引用;
f) count:写消息数据个数的记录(如果写消息数据是个数组的话,该值会大于1)
这里说明下,pendingSize属性记录的不单单是写请求数据的大小,记录的是这个写请求对象的大小。这是什么意思了?这里做个简单的介绍:
一个对象占用的内存大小除了实例数据(instance data),还包括对象头(header)以及对齐填充(padding)。所以一个对象所占的内存大小为『对象头 + 实例数据 + 对齐填充』,即
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 8 reference fields
// - 2 long fields
// - 2 int fields
// - 1 boolean field
// - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
👆假设的是64位操作系统下,且没有使用各种压缩选项的情况。对象头的长度占16字节;引用属性占8字节;long类型占8字节;int类型占4字节;boolean类型占1字节。同时,由于HotSpot VM的自动内存管理系统要求对象起始地址必须是8字节的整数倍,也就是说对象的大小必须是8字节的整数倍,如果最终字节数不为8的倍数,则padding会补足至8的倍数。
static final class Entry {
private final Handle<Entry> handle; // reference field ( 8 bytes)
Entry next; // reference field ( 8 bytes)
Object msg; // reference field ( 8 bytes)
ByteBuffer[] bufs; // reference field ( 8 bytes)
ByteBuffer buf; // reference field ( 8 bytes)
ChannelPromise promise; // reference field ( 8 bytes)
long progress; // long field ( 8 bytes)
long total; // long field ( 8 bytes)
int pendingSize; // int field ( 4 bytes)
int count = -1; // int field ( 4 bytes)
boolean cancelled; // boolean field ( 1 bytes)
我们根据上面的理论来计算下Entry对象占用内存的大小:
header (16 bytes) + 6 * reference fields(8 bytes)+ 2 * long fields(8 bytes)+ 2 * int fields(4 bytes)+ 1 * boolean field(1 byte)= 89 ——> 加上7bytes的padding = 96 bytes
这就是CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD默认值 96 的由来。(关于JVM中对象的内存大小的详细分析,欢迎参阅JVM中 对象的内存布局 以及 实例分析)
addMessage方法主要就是将请求写出的数据封装为Entry对象,然后加入到tailEntry和unflushedEntry中。
然后调用『incrementPendingOutboundBytes(entry.pendingSize, false);』对totalPendingSize属性以及unwritable字段做调整。
totalPendingSize字段记录了该ChannelOutboundBuffer中所有带发送Entry对象的占的总内存大小和所有带发送数据的大小。unwritable用来标示当前该Channel要发送的数据是否已经超过了设定 or 默认的WriteBufferWaterMark的high值。如果当前操作导致了待写出的数据(包括Entry对象大小以及真实需要传输数据的大小)超过了设置写缓冲区的高水位,那么将会触发fireChannelWritabilityChanged事件。
WriteBufferWaterMark
WriteBufferWaterMark用于设置写缓存的高水位标志和低水位标志。
如果写缓冲区队列中字节的数量超过了设置的高水位标志,那么Channel#isWritable()方法将开始返回false。然后当写缓冲区中的字节数量减少至小于了低水位标志,Channel#isWritable()方法会重新开始返回true。关于Channel#isWritable()方法目前主要用在ChunkedWriteHandler以及HTTP2的Handler中。因此,如果你想在程序中通过设置WriteBufferWaterMark来控制数据的写出,但你在程序中并没有使用ChunkedWriteHandler或HTTP2,那么这就需要我们自己通过『Channel#isWritable()』来实现是否可用继续写出数据。 比如:
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// ......
if( ctx.channel().isWritable() )
{
ctx.writeAndFlush(...)
}
}
总的来说,在write的操作最终会将ByteBuf封装为一个Entry对象放到unflushedEntry单向链表的尾部(通过修改tailEntry来实现的),并修改用于记录有该ChannelOutboundBuffer中待发送Entry对象总内存大小的属性totalPendingSize字段。
好了,但目前为止write操作就讲完了。接下来我们来看下flush操作:
invokeFlush0()
flush也是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
这里的unsafe成员变量依旧是NioSocketChannelUnsafe对象,跟进去:
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
这里主要完成两个操作:
① outboundBuffer.addFlush();
添加一个flush到这个ChannelOutboundBuffer,这意味着,将在此之前添加的消息标记为flushed,你将可以处理这些消息。
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
a) 如我们前面所说,write操作最终会将包含有待发送消息的ByteBuf封装成Entry对象放入unflushedEntry单向链表的尾部。而这里就会先判断unflushedEntry是否为null,如果为null则说明所有的entries已经被flush了,并在此期间没有新的消息被添加进ChannelOutboundBuffer中。所有直接返回就好。
b) 如果unflushedEntry非空,则说明有待发送的entries等待被发送。那么将unflushedEntry赋值给flushedEntry(调用flush()操作时就是将该flushedEntry单向链表中的entries的数据发到网络),并将unflushedEntry置为null,表示没有待发送的entries了。并通过flushed成员属性记录待发送entries的个数。
② flush0();
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
return;
}
super.flush0();
}
a) 首先通过isFlushPending()方法来判断flush操作是否需要被挂起:
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
也就是说,首先会判断当前NioSocketChannel的SelectionKey.OP_WRITE事件是否有被注册到对应的Selector上,如果有,则说明当前写缓冲区已经满了(这里指是socket的写缓冲区满了,并且socket并没有被关闭,那么write操作将返回0。这是如果还有未写出的数据待被发送,那么就会注册SelectionKey.OP_WRITE事件)。等写缓冲区有空间时,SelectionKey.OP_WRITE事件就会被触发,到时NioEventLoop的事件循环就会调用forceFlush()方法来继续将为写出的数据写出,所以这里直接返回就好。
b) 当socket写缓冲区未满,那么就执行flush0()
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
- 判断Channel的输出缓冲区是否为null或待发送的数据个数为0,如果是则直接返回,因为此时并没有数据需要发送。
- 判断当前的NioSocketChannel是否是Inactive状态,如果是,则会标识所有等待写请求为失败(即所有的write操作的promise都会是失败完成),并且如果NioSocketChannel已经关闭了,失败的原因是“FLUSH0_CLOSED_CHANNEL_EXCEPTION”且不会回调注册到promise上的listeners;但如果NioSocketChannel还是open的,则失败的原始是“FLUSH0_NOT_YET_CONNECTED_EXCEPTION”并且会回调注册到promise上的listeners。
-
调用doWrite(outboundBuffer);方法将Channel输出缓冲区中的数据通过socket传输给对端:
doWrite是一个写循环操作,当满足一定条件时会结束循环。每一次循环会完成的操作:
- 判断当前ChannelOutboundBuffer中的数据都已经被传输完了,如果已经传输完了,并且发现NioSocketChannel还注册有SelectionKey.OP_WRITE事件,则将SelectionKey.OP_WRITE从感兴趣的事件中移除,即,Selector不在监听该NioSocketChannel的可写事件了。然后跳出循环,方法返回。
- 初始化writtenBytes = 0、done = false、setOpWrite = false三个属性,它们分别表示本次循环已经写出的字节数、本次循环是否写出了所有待写出的数据、是否需要设置SelectionKey.OP_WRITE事件的标志为。
- 『ByteBuffer[] nioBuffers = in.nioBuffers()』
获取所有待写出的ByteBuffer,它会将ChannelOutboundBuffer中所有待写出的ByteBuf转换成JDK Bytebuffer(因为,底层依旧是基于JDK NIO的网络传输,所有最终传输的还是JDK 的ByteBuffer对象)。它依次出去每个待写的ByteBuf,然后根据ByteBuf的信息构建一个ByteBuffer(这里的ByteBuf是一个堆外ByteBuf,因此构建出来的ByteBuffer也是一个堆外的ByteBuffer),并设置该ByteBuffer的readerIndex、readableBytes的值为ByteBuf对应的值。然后返回构建好的ByteBuffer[]数组。 - 获取本次循环需要写出的ByteBuffer个数
- 获取本次循环总共需要写出的数据的字节总数
- 根据nioBufferCnt值的不同执行不同的传输流程:
[1] nioBufferCnt == 0 :对非ByteBuffer对象的数据进行普通的写操作。
上面我们说了in.nioBuffers()会将ChannelOutboundBuffer中所有待发送的ByteBuf转换成Bytebuffer后返回一个ByteBuffer[]数组,以便后面进行ByteBuffer的传输,而nioBufferCnt则表示待发送ByteBuffer的个数,即ByteBuffer[]数组的长度。注意,这里nioBuffers()仅仅是对ByteBuf对象进行了操作,但是我们从前面的流程可以得知,除了ByteBuf外FileRegion对象也是可以进行底层的网络传输的。因此当待传输的对象是FileRegion时“nioBufferCnt == 0”,那么这是就会调用『AbstractNioByteChannel#doWrite(ChannelOutboundBuffer in)』方法来完成数据的传输。实际上底层就是依靠JDK NIO 的 FileChannel来实现零拷贝的数据传输。
[2] nioBufferCnt == 1 :说明只有一个ByteBuffer等待被传输,那么不使用gather的write操作来传输数据(JDK NIO 支持一次写单个ByteBuffer 以及 一次写多个ByteBuffer的聚集写模式)
[3] nioBufferCnt > 1 :说明有多个ByteBuffer等待被传输,那么使用JDK NIO的聚集写操作,一次性传输多个ByteBuffer到NioSocketChannel中。
[2]、[3] 中写操作的逻辑是一样的:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(...);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
config().getWriteSpinCount()为16,也就是一次写操作会最多执行16次的SocketChannel.write操作来将数据写到网络中。每次ch.write完都会进行相应的『expectedWrittenBytes -= localWrittenBytes;』操作,将expectedWrittenBytes期待被写的字节数减去已经写出的字节数。如果在最后expectedWrittenBytes依旧大于0,则说明在这16次的socket写操作后依旧还有未写完的数据等待被继续写,那么done就会为false;否则若所有的数据都写完了,done会被置为true。注意,ch.write操作会返回本次写操作写出的字节数,但该方法返回0时,即localWrittenBytes为0,则说明底层的写缓冲区已经满了(这里应该指的是linux底层的写缓冲区满了),这是就会将setOpWrite置为true,此时因为数据还没写完done还是false。那么这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。
- 『in.removeBytes(writtenBytes)』:释放所有已经写出去的缓存对象,并修改部分写缓冲的索引。
public void removeBytes(long writtenBytes) {
for (;;) {
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}
通过已经写出数据的字节数来清理或修改ByteBuf。也就是说writtenBytes的大小可能是包含了多个ByteBuf以及某个ByteBuf的部分数据(因为一个ByteBuf可能只写出了部分数据,还未完成被写出到网络层中)。
a) 『 if (readableBytes <= writtenBytes) 』这个if判断表示:本次socket的write操作(这里是真的是网络通信写操作了)已经写出去的字节数”大于"了当前ByteBuf包可读取的字节数。 这说明,当前这个包中所有的可写的数据都已经写完了,既然当前这个ByteBuf的数据都写完了,那么久可以将其删除了。即,调用『remove()』操作,这个操作就会标识异步write操作为成功完成,并且会回调已经注册到ByteBuf的promise上的所有listeners。同时会原子的修改ChannelOutboundBuffer的totalPendingSize属性值,减少已经写出的数据大小(包括Entry对象内存大小和真实数据的大小),并且如果减少后totalPendingSize小于设置 or 默认的WriteBufferWaterMark的low值,并且再次之前totalPendingSize超过了WriteBufferWaterMark的high值,那么将触发fireChannelWritabilityChanged事件。『remove()』操作还会将当前的ByteBuf指向下一个待处理的ByteBuf,最后释放这个已经被写出去的ByteBuf对象资源。
b) 通过上面的分析,我们知道大数据包走的是else流程。也就是说,本次真实写出去的数据 比 当前这个ByteBuf的可读取数据要小(也就说明,当前这个ByteBuf还没有被完全的写完。因此并不会通过调用『remove()』操作。直到整个大数据包所有的内容都写出去了,那么这是if(readableBytes <= writtenBytes)才会为真执行『remove()』完成相关后续的操作)。那么此时,会根据已经写出的字节数大小修改该ByteBuf的readerIndex索引值。并且,如果该异步写操作的ChannelPromise是ChannelProgressivePromise对象并且注册了相应的progressiveListeners事件,则该listener会得到回调。你可以通过该listener来观察到大数据包写出去的进度。
- done表示本次写操作是否完成,。有两种情况下done为false:
[1] 还有未写完的数据待发送,并且写缓冲区已经满了(这里指的是linux底层的写缓冲区满了),无法再继续写出,那么此时setOpWrite标识为true。这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。接着退出doWrite()循环写操作。
[2] 执行了config().getWriteSpinCount()次(默认16次)socket写操作后,数据仍旧未写完,那么此时会将flush()操作封装成一个task提交至NioEventLoop的taskQueue中,这样在NioEventLoop的下一次事件循环时会就会取出该任务并执行,也就会继续写出未写完的任务了。这也说明了,如果发送的是很大的数据包的话,可能一次写循环操作是无法将数据全部发送出去的,也不会为了发送该大数据包的数据而导致NioEventLoop线程的阻塞以至于影响NioEventLoop上其他Channel的操作和响应。接着退出doWrite()循环写操作。
好了,到目前为止,Netty整个的写流程就分析完了。本文主要专注于写操作的流程,而并未到Netty的内存模式进行展开。
后记
若文章有任何错误,望大家不吝指教:)