Netty技术深入浅出Netty源码剖析

netty源码解析之IO读写(三)

2019-03-23  本文已影响3人  binecy

第二次看netty源码,对netty的理解也更深入了点,修改了不少文章内容。
后面有时间再分析一下netty的buffer,codec等内容。

源码分析基于netty 4.1

前面已经说过netty对accept事件的处理,现在来讲讲netty中的read/write过程。

Pipeline

DefaultChannelPipeline是一个netty处理io事件的默认通道,通道中的每个节点都是AbstractChannelHandlerContext,
AbstractChannelHandlerContext.next指向下一个AbstractChannelHandlerContext,prev指向前一个AbstractChannelHandlerContext。
Pipeline是标准的责任链。
AbstractChannelHandlerContext.handler()方法返回一个ChannelHandler,ChannelInboundHandler/ChannelOutboundHandler都继承自这个接口,
我们继承这两个接口的适配类ChannelInboundHandlerAdapter/ChannelInboundHandlerAdapter,编写具体的业务逻辑。
DefaultChannelPipeline固定有两个节点head/tail,addLast会把节点添加到tail前。

read

回顾一下, NioEventLoop中对read事件的处理

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    
    ...
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
        if (!ch.isOpen()) {
            return;
        }
    }
}

ch是NioSocketChannel对象, ch.unsafe()返回NioSocketChannel.NioSocketChannelUnsafe,
unsafe.read() 会调用到NioSocketChannelUnsafe父类NioByteUnsafe.read():

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;

    do {
        // 申请缓存区空间
        byteBuf = allocHandle.allocate(allocator);
        // 从socket读取数据到缓存区
        allocHandle.lastBytesRead(doReadBytes(byteBuf));

        allocHandle.incMessagesRead(1);
        
        // 触发ChannelRead事件
        pipeline.fireChannelRead(byteBuf);
        byteBuf = null;
    } while (allocHandle.continueReading());

    allocHandle.readComplete();
    // 触发ChannelReadComplete事件
    pipeline.fireChannelReadComplete();

从socket读取数据到byteBuf中,再调用DefaultChannelPipeline.fireChannelRead。

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

调用AbstractChannelHandlerContext静态方法invokeChannelRead,参数是head和msg

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        ...
    }
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {  // 检查handler的状态
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}    

invokeHandler()会检查ChannelHandler是否已经调用了handlerAdded

handler()返回一个ChannelHandler,这里再转化为ChannelInboundHandler,并调用它的channelRead。(HeadContext.handler返回this,HeadContext同时实现了ChannelOutboundHandler/ChannelInboundHandler)。

看看HeadContext.channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

ctx.fireChannelRead调用的是AbstractChannelHandlerContext.fireChannelRead:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

findContextInbound会找到下一个ChannelInboundHandler

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

ctx.fireChannelRead(msg);的作用就是找到下一个ChannelInboundHandler,并调用它的fireChannelRead方法, 这里会调用到我们实现的ChannelInboundHandler接口,并调用我们重写的fireChannelRead方法,进行逻辑处理。

我们重写的fireChannelRead方法最后要调用ctx.fireChannelRead(msg),这样会调用到AbstractChannelHandlerContext.fireChannelRead, 它会找到下一个InboundHandler并调用fireChannelRead方法,这个数据才能在通道中继续流转(除非调用write相关方法)。

write

下面看看write过程
我们可以通过ChannelHandlerContext .writeAndFlush写入结果给客户, 它会调用AbstractChannelHandlerContext.write:

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            ...
        }
    }

findContextOutbound会找到当前节点前一个OutboundHandler(write和read的方向相反,这里向前找OutboundHandler)

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

next.invokeWriteAndFlush还是调用到AbstractChannelHandlerContext.invokeWriteAndFlush

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

invokeWrite0也比较简单, 就是调用handler的处理

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

这里会调用到我们实现的ChannelOutboundHandler,并调用我们重写的write方法,实现业务逻辑。

最后会调用到HeadContext.write, 注意, HeadContext既是InboundHandler, 也是OutboundHandler

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里调用了AbstractUnsafe.write, 将数据write到socket中,具体过程这里不再描述。

invokeFlush0();也是类似的流程, 这里不再复述。

那么ChannelOutboundHandler的read事件, 是在哪里触发的呢? 其实是在fireChannelReadComplete中
pipeline.fireChannelReadComplete(); 会调用到DefaultChannelPipeline.channelReadComplete

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelReadComplete();

    readIfIsAutoRead();
}


private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

如果配置为AutoRead, 就会调用channel.read(), 进而调用 pipeline.read(), 最终就会触发ChannelOutboundHandler.read方法。

到这里, netty启动, accept, read/write的一个完整流程都讲完了。
netty是非常优秀的框架, 模块化做到很好, 对jdk的future, buffer这些模块都做了扩展,还自行进行了内存管理。
对netty流程熟悉后, 就可以继续学习netty的这些闪光点, 网上也有很多优秀的教程了。

下面是一些非常优秀的netty博客:
Netty源码分析-占小狼
Netty那点事-黄亿华
Netty系列之Netty线程模型-李林锋
Netty系列之Netty高性能之道-李林锋
Netty_in_Action-译文
Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)

上一篇下一篇

猜你喜欢

热点阅读