消息队列MQ(Kafka&RabbitMQ)程序员首页投稿(暂停使用,暂停投稿)

Netty 源码解析 ——— writeAndFlush流程分析

2017-12-15  本文已影响441人  tomas家的小拨浪鼓

本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。

源码解析

本文主要对Netty的写数据流程进行分析。代码调用仅一句:

ctx.writeAndFlush("from server : " + UUID.randomUUID());

变量 ctx 指的是 ChannelHandlerContext对象,我们跟进ChannelHandlerContext的writeAndFlush方法:

public ChannelFuture writeAndFlush(Object msg) {
    return write    AndFlush(msg, newPromise());
}

因为写是异步操作,所以如果我们没有自定义一个ChannelPromise的话,就会构建一个默认的ChannelPromise(即,DefaultChannelPromise)来表示该异步操作。我们可以通过往ChannelPromise中注册listener来得到该异步操作的结果(成功 or 失败),listener会在异步操作完成后得到回调。

往下跟,我们会到👇流程

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

这里会完成两个重要的步骤:
① invokeWrite0(msg, promise);将消息放入输出缓冲区中(ChannelOutboundBuffer)
② invokeFlush0(); 将输出缓冲区中的数据通过socket发送到网络中

下面我们来详细展开这两步骤

invokeWrite0(msg, promise)
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

write是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

unsafe是我们构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。NioServerSocketChannel中持有的unsafe成员变量是NioMessageUnsafe对象,而NioSocketChannel中持有的unsafe成员变量是NioSocketChannelUnsafe对象。这里我们要看的是NioSocketChannel的write流程:

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

① 获取该NioSocketChannel的ChannelOutboundBuffer成员属性。(确切地来说ChannelOutboundBuffer是NioSocketChannelUnsafe对象中的成员属性,而NioSocketChannelUnsafe才是NioSocketChannel的成员属性。)每一个NioSocketChannel会维护一个它们自己的ChannelOutboundBuffer,用于存储待出站写请求。
判断该outboundBuffer是否为null,如果为null则说明该NioSocketChannel已经关闭了,那么就会标志该异步写操作为失败完成,并释放写消息后返回。
② 『msg = filterOutboundMessage(msg);』过滤待发送的消息:

    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }

            return newDirectBuffer(buf);
        }

        if (msg instanceof FileRegion) {
            return msg;
        }

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }

过滤待发送的消息,只有ByteBuf(堆 or 非堆)以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。
『size = pipeline.estimatorHandle().size(msg);』估计待发送数据的大小:

public int size(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    }
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    }
    if (msg instanceof FileRegion) {
        return 0;
    }
    return unknownSize;
}

估计待发送消息数据的大小,如果是FileRegion的话直接饭0,否则返回ByteBuf中可读取字节数。
③ 『outboundBuffer.addMessage(msg, size, promise)』将消息加入outboundBuffer中等待发送。

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

首先对ChannelOutboundBuffer、Entry做个简单介绍

ChannelOutboundBuffer
一个内部的数据结构,被AbstractChannel用于存储它的待出站写请求。
ChannelOutboundBuffer中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。以及一个private Entry tailEntry;对象表示始终指向最后一个Entry对象(即,最后加入到该ChannelOutboundBuffer中的写请求的数据消息)
unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。

Entry
Entry是ChannelOutboundBunffer的一个内部类,它是对真实的写消息数据以及其相关信息的一个封装。大致封装了如下信息:
a) pendingSize:记录有该ByteBuf or ByteBufs 中待发送数据大小 和 对象本身内存大小 的累加和;
b) promise:该异步写操作的ChannelPromise(用于在完成真是的网络层write后去标识异步操作的完成以及回调已经注册到该promise上的listeners);
c) total:待发送数据包的总大小(该属性与pendingSize的区别在于,如果是待发送的是FileRegion数据对象,则pengdingSize中只有对象内存的大小,即真实的数据大小被记录为0;但total属性则是会记录FileRegion中数据大小,并且total属性是不包含对象内存大小,仅仅是对数据本身大小的记录);
e) msg:原始消息对象的引用;
f) count:写消息数据个数的记录(如果写消息数据是个数组的话,该值会大于1)
这里说明下,pendingSize属性记录的不单单是写请求数据的大小,记录的是这个写请求对象的大小。这是什么意思了?这里做个简单的介绍:
一个对象占用的内存大小除了实例数据(instance data),还包括对象头(header)以及对齐填充(padding)。所以一个对象所占的内存大小为『对象头 + 实例数据 + 对齐填充』,即

entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
// Assuming a 64-bit JVM:
//  - 16 bytes object header
//  - 8 reference fields
//  - 2 long fields
//  - 2 int fields
//  - 1 boolean field
//  - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
        SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

👆假设的是64位操作系统下,且没有使用各种压缩选项的情况。对象头的长度占16字节;引用属性占8字节;long类型占8字节;int类型占4字节;boolean类型占1字节。同时,由于HotSpot VM的自动内存管理系统要求对象起始地址必须是8字节的整数倍,也就是说对象的大小必须是8字节的整数倍,如果最终字节数不为8的倍数,则padding会补足至8的倍数。

static final class Entry {
    private final Handle<Entry> handle;     // reference field ( 8 bytes)
    Entry next;     // reference field ( 8 bytes)
    Object msg;     // reference field ( 8 bytes)
    ByteBuffer[] bufs;     // reference field ( 8 bytes)
    ByteBuffer buf;     // reference field ( 8 bytes)
    ChannelPromise promise;     // reference field ( 8 bytes)
    long progress;     // long field ( 8 bytes)
    long total;     // long field ( 8 bytes)
    int pendingSize;     // int field ( 4 bytes)
    int count = -1;     // int field ( 4 bytes)
    boolean cancelled;     // boolean field ( 1 bytes)

我们根据上面的理论来计算下Entry对象占用内存的大小:
header (16 bytes) + 6 * reference fields(8 bytes)+ 2 * long fields(8 bytes)+ 2 * int fields(4 bytes)+ 1 * boolean field(1 byte)= 89 ——> 加上7bytes的padding = 96 bytes
这就是CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD默认值 96 的由来。(关于JVM中对象的内存大小的详细分析,欢迎参阅JVM中 对象的内存布局 以及 实例分析)

addMessage方法主要就是将请求写出的数据封装为Entry对象,然后加入到tailEntry和unflushedEntry中。
然后调用『incrementPendingOutboundBytes(entry.pendingSize, false);』对totalPendingSize属性以及unwritable字段做调整。
totalPendingSize字段记录了该ChannelOutboundBuffer中所有带发送Entry对象的占的总内存大小和所有带发送数据的大小。unwritable用来标示当前该Channel要发送的数据是否已经超过了设定 or 默认的WriteBufferWaterMark的high值。如果当前操作导致了待写出的数据(包括Entry对象大小以及真实需要传输数据的大小)超过了设置写缓冲区的高水位,那么将会触发fireChannelWritabilityChanged事件。

WriteBufferWaterMark
WriteBufferWaterMark用于设置写缓存的高水位标志和低水位标志。
如果写缓冲区队列中字节的数量超过了设置的高水位标志,那么Channel#isWritable()方法将开始返回false。然后当写缓冲区中的字节数量减少至小于了低水位标志,Channel#isWritable()方法会重新开始返回true。关于Channel#isWritable()方法目前主要用在ChunkedWriteHandler以及HTTP2的Handler中。因此,如果你想在程序中通过设置WriteBufferWaterMark来控制数据的写出,但你在程序中并没有使用ChunkedWriteHandler或HTTP2,那么这就需要我们自己通过『Channel#isWritable()』来实现是否可用继续写出数据。 比如:

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    // ......
    if( ctx.channel().isWritable() ) 
    { 
        ctx.writeAndFlush(...) 
    }
}

总的来说,在write的操作最终会将ByteBuf封装为一个Entry对象放到unflushedEntry单向链表的尾部(通过修改tailEntry来实现的),并修改用于记录有该ChannelOutboundBuffer中待发送Entry对象总内存大小的属性totalPendingSize字段。

好了,但目前为止write操作就讲完了。接下来我们来看下flush操作:

invokeFlush0()

flush也是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:

public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

这里的unsafe成员变量依旧是NioSocketChannelUnsafe对象,跟进去:

public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();
    flush0();
}

这里主要完成两个操作:
① outboundBuffer.addFlush();
添加一个flush到这个ChannelOutboundBuffer,这意味着,将在此之前添加的消息标记为flushed,你将可以处理这些消息。

    public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }

a) 如我们前面所说,write操作最终会将包含有待发送消息的ByteBuf封装成Entry对象放入unflushedEntry单向链表的尾部。而这里就会先判断unflushedEntry是否为null,如果为null则说明所有的entries已经被flush了,并在此期间没有新的消息被添加进ChannelOutboundBuffer中。所有直接返回就好。
b) 如果unflushedEntry非空,则说明有待发送的entries等待被发送。那么将unflushedEntry赋值给flushedEntry(调用flush()操作时就是将该flushedEntry单向链表中的entries的数据发到网络),并将unflushedEntry置为null,表示没有待发送的entries了。并通过flushed成员属性记录待发送entries的个数。

② flush0();

protected final void flush0() {
    // Flush immediately only when there's no pending flush.
    // If there's a pending flush operation, event loop will call forceFlush() later,
    // and thus there's no need to call it now.
    if (isFlushPending()) {
        return;
    }
    super.flush0();
}

a) 首先通过isFlushPending()方法来判断flush操作是否需要被挂起:

private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

也就是说,首先会判断当前NioSocketChannel的SelectionKey.OP_WRITE事件是否有被注册到对应的Selector上,如果有,则说明当前写缓冲区已经满了(这里指是socket的写缓冲区满了,并且socket并没有被关闭,那么write操作将返回0。这是如果还有未写出的数据待被发送,那么就会注册SelectionKey.OP_WRITE事件)。等写缓冲区有空间时,SelectionKey.OP_WRITE事件就会被触发,到时NioEventLoop的事件循环就会调用forceFlush()方法来继续将为写出的数据写出,所以这里直接返回就好。
b) 当socket写缓冲区未满,那么就执行flush0()

protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            /**
             * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
             * may still return {@code true} even if the channel should be closed as result of the exception.
             */
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            outboundBuffer.failFlushed(t, true);
        }
    } finally {
        inFlush0 = false;
    }
}
  1. 判断Channel的输出缓冲区是否为null或待发送的数据个数为0,如果是则直接返回,因为此时并没有数据需要发送。
  2. 判断当前的NioSocketChannel是否是Inactive状态,如果是,则会标识所有等待写请求为失败(即所有的write操作的promise都会是失败完成),并且如果NioSocketChannel已经关闭了,失败的原因是“FLUSH0_CLOSED_CHANNEL_EXCEPTION”且不会回调注册到promise上的listeners;但如果NioSocketChannel还是open的,则失败的原始是“FLUSH0_NOT_YET_CONNECTED_EXCEPTION”并且会回调注册到promise上的listeners。
  3. 调用doWrite(outboundBuffer);方法将Channel输出缓冲区中的数据通过socket传输给对端:


    doWrite是一个写循环操作,当满足一定条件时会结束循环。每一次循环会完成的操作:

  1. 判断当前ChannelOutboundBuffer中的数据都已经被传输完了,如果已经传输完了,并且发现NioSocketChannel还注册有SelectionKey.OP_WRITE事件,则将SelectionKey.OP_WRITE从感兴趣的事件中移除,即,Selector不在监听该NioSocketChannel的可写事件了。然后跳出循环,方法返回。
  2. 初始化writtenBytes = 0、done = false、setOpWrite = false三个属性,它们分别表示本次循环已经写出的字节数、本次循环是否写出了所有待写出的数据、是否需要设置SelectionKey.OP_WRITE事件的标志为。
  3. 『ByteBuffer[] nioBuffers = in.nioBuffers()』
    获取所有待写出的ByteBuffer,它会将ChannelOutboundBuffer中所有待写出的ByteBuf转换成JDK Bytebuffer(因为,底层依旧是基于JDK NIO的网络传输,所有最终传输的还是JDK 的ByteBuffer对象)。它依次出去每个待写的ByteBuf,然后根据ByteBuf的信息构建一个ByteBuffer(这里的ByteBuf是一个堆外ByteBuf,因此构建出来的ByteBuffer也是一个堆外的ByteBuffer),并设置该ByteBuffer的readerIndex、readableBytes的值为ByteBuf对应的值。然后返回构建好的ByteBuffer[]数组。
  4. 获取本次循环需要写出的ByteBuffer个数
  5. 获取本次循环总共需要写出的数据的字节总数
  6. 根据nioBufferCnt值的不同执行不同的传输流程:
    [1] nioBufferCnt == 0 :对非ByteBuffer对象的数据进行普通的写操作。
    上面我们说了in.nioBuffers()会将ChannelOutboundBuffer中所有待发送的ByteBuf转换成Bytebuffer后返回一个ByteBuffer[]数组,以便后面进行ByteBuffer的传输,而nioBufferCnt则表示待发送ByteBuffer的个数,即ByteBuffer[]数组的长度。注意,这里nioBuffers()仅仅是对ByteBuf对象进行了操作,但是我们从前面的流程可以得知,除了ByteBuf外FileRegion对象也是可以进行底层的网络传输的。因此当待传输的对象是FileRegion时“nioBufferCnt == 0”,那么这是就会调用『AbstractNioByteChannel#doWrite(ChannelOutboundBuffer in)』方法来完成数据的传输。实际上底层就是依靠JDK NIO 的 FileChannel来实现零拷贝的数据传输。
    [2] nioBufferCnt == 1 :说明只有一个ByteBuffer等待被传输,那么不使用gather的write操作来传输数据(JDK NIO 支持一次写单个ByteBuffer 以及 一次写多个ByteBuffer的聚集写模式)
    [3] nioBufferCnt > 1 :说明有多个ByteBuffer等待被传输,那么使用JDK NIO的聚集写操作,一次性传输多个ByteBuffer到NioSocketChannel中。

[2]、[3] 中写操作的逻辑是一样的:

for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
    final long localWrittenBytes = ch.write(...);
    if (localWrittenBytes == 0) {
        setOpWrite = true;
        break;
    }
    expectedWrittenBytes -= localWrittenBytes;
    writtenBytes += localWrittenBytes;
    if (expectedWrittenBytes == 0) {
        done = true;
        break;
    }
}

config().getWriteSpinCount()为16,也就是一次写操作会最多执行16次的SocketChannel.write操作来将数据写到网络中。每次ch.write完都会进行相应的『expectedWrittenBytes -= localWrittenBytes;』操作,将expectedWrittenBytes期待被写的字节数减去已经写出的字节数。如果在最后expectedWrittenBytes依旧大于0,则说明在这16次的socket写操作后依旧还有未写完的数据等待被继续写,那么done就会为false;否则若所有的数据都写完了,done会被置为true。注意,ch.write操作会返回本次写操作写出的字节数,但该方法返回0时,即localWrittenBytes为0,则说明底层的写缓冲区已经满了(这里应该指的是linux底层的写缓冲区满了),这是就会将setOpWrite置为true,此时因为数据还没写完done还是false。那么这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。

  1. 『in.removeBytes(writtenBytes)』:释放所有已经写出去的缓存对象,并修改部分写缓冲的索引。
public void removeBytes(long writtenBytes) {
    for (;;) {
        Object msg = current();
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }

        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;

        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    clearNioBuffers();
}

通过已经写出数据的字节数来清理或修改ByteBuf。也就是说writtenBytes的大小可能是包含了多个ByteBuf以及某个ByteBuf的部分数据(因为一个ByteBuf可能只写出了部分数据,还未完成被写出到网络层中)。
a) 『 if (readableBytes <= writtenBytes) 』这个if判断表示:本次socket的write操作(这里是真的是网络通信写操作了)已经写出去的字节数”大于"了当前ByteBuf包可读取的字节数。 这说明,当前这个包中所有的可写的数据都已经写完了,既然当前这个ByteBuf的数据都写完了,那么久可以将其删除了。即,调用『remove()』操作,这个操作就会标识异步write操作为成功完成,并且会回调已经注册到ByteBuf的promise上的所有listeners。同时会原子的修改ChannelOutboundBuffer的totalPendingSize属性值,减少已经写出的数据大小(包括Entry对象内存大小和真实数据的大小),并且如果减少后totalPendingSize小于设置 or 默认的WriteBufferWaterMark的low值,并且再次之前totalPendingSize超过了WriteBufferWaterMark的high值,那么将触发fireChannelWritabilityChanged事件。『remove()』操作还会将当前的ByteBuf指向下一个待处理的ByteBuf,最后释放这个已经被写出去的ByteBuf对象资源。
b) 通过上面的分析,我们知道大数据包走的是else流程。也就是说,本次真实写出去的数据 比 当前这个ByteBuf的可读取数据要小(也就说明,当前这个ByteBuf还没有被完全的写完。因此并不会通过调用『remove()』操作。直到整个大数据包所有的内容都写出去了,那么这是if(readableBytes <= writtenBytes)才会为真执行『remove()』完成相关后续的操作)。那么此时,会根据已经写出的字节数大小修改该ByteBuf的readerIndex索引值。并且,如果该异步写操作的ChannelPromise是ChannelProgressivePromise对象并且注册了相应的progressiveListeners事件,则该listener会得到回调。你可以通过该listener来观察到大数据包写出去的进度。

  1. done表示本次写操作是否完成,。有两种情况下done为false:
    [1] 还有未写完的数据待发送,并且写缓冲区已经满了(这里指的是linux底层的写缓冲区满了),无法再继续写出,那么此时setOpWrite标识为true。这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。接着退出doWrite()循环写操作。
    [2] 执行了config().getWriteSpinCount()次(默认16次)socket写操作后,数据仍旧未写完,那么此时会将flush()操作封装成一个task提交至NioEventLoop的taskQueue中,这样在NioEventLoop的下一次事件循环时会就会取出该任务并执行,也就会继续写出未写完的任务了。这也说明了,如果发送的是很大的数据包的话,可能一次写循环操作是无法将数据全部发送出去的,也不会为了发送该大数据包的数据而导致NioEventLoop线程的阻塞以至于影响NioEventLoop上其他Channel的操作和响应。接着退出doWrite()循环写操作。

好了,到目前为止,Netty整个的写流程就分析完了。本文主要专注于写操作的流程,而并未到Netty的内存模式进行展开。

后记

若文章有任何错误,望大家不吝指教:)

上一篇下一篇

猜你喜欢

热点阅读