NettyServer源码分析

2016-11-20  本文已影响0人  沧行

可以先看下NettyServer启动流程分析

以EchoServer为例,main方法如下:

    // Configure the server.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 p.addLast(new EchoServerHandler());
             }
         });

        // Start the server.
        ChannelFuture f = b.bind(PORT).sync();

        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        // Shut down all event loops to terminate all threads.
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

ServerBootstrap通过group方法聚合了一个bossGroup和一个workerGroup,他们都是NioEventLoopGroup实例。
当执行ServerBootstrap的bind方法时,实际上执行的是其父类AbstractBootstrap的bind方法,其中调用了其doBind方法,如下:

  //AbstractBootstrap       
  private ChannelFuture doBind(final SocketAddress localAddress) {
    /**
     * 创建,初始化channel,将channel注册到selector
     */
    final ChannelFuture regFuture = initAndRegister();//(1)
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);//(2)
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

主要是(1)(2)两处的代码,先分析(1)处的initAndRegister方法,如下:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        /**
         * 创建channel对象,对于server端为NioServerSocketChannel
         */
        channel = channelFactory().newChannel();(3)
        init(channel);(4)
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    /**
     * 在boss EventLoopGroup中注册该channel
     * 从boss EventLoopGroup中选出一个EventLoop注册该channel ->SingleThreadEventLoop.register -> channel.unsafe().register
     *
     */
    ChannelFuture regFuture = group().register(channel);(5)
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

(3)处会创建channel,因为在ServerBootstrap中设置了channel(NioServerSocketChannel.class),因此这里创建的channel为NioServerSocketChannel类型。
(4)用于初始化该NioServerSocketChannel,调用了其子类ServerBootstrap.init(Channel channel)方法,如下:

/**
 * 该方法用于初始化创建的channel对象,channelfactory创建的channel
 */
@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    /**
     * channel为NioServerSocketChannel,pipeline为其父类AbstractChannel的成员,为DefaultChannelPipeline类型。
     * DefaultChannelPipeline包含一个HeadContext和一个TailContext
     */
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    /**
     * 该childHandler一般为用户初始化ServerBootstrap时传入的ChannelInitializer
     */
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    /**
     * 对于NioServerSocketChannel,最初只有一个LoggingHandler或没有,这里添加一个初始化handler(ChannelInitializer)
     * 当触发ChannelInitializer这个handler注册事件时,会执行initChannel方法,再添加一个handler(ServerBootstrapAcceptor),这个handler用于将对于NioSocketChannel注册到work eventloop。
     */
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            /**
             * 这里的handler为ServerBootstrap.handler传入的handler,一般为LoggingHandler或不设置
             */
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
            // In this case the initChannel(...) method will only be called after this method returns. Because
            // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
            // placed in front of the ServerBootstrapAcceptor.
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

主要初始化了NioServerSocketChannel的childHandler和pipeline,childHandler为用户传入的ChannelInitializer,在初始化NioServerSocketChannel时会在pipeline中加入一个ChannelInitializer,ChannelInitializer的initChannel方法为向pipeline加入一个ServerBootstrapAcceptor。

到这一步,NioServerSocketChannel的pipeline中包含3个handler,分别是:
head->ChannelInitializer->tail,但实际上,在netty中,用context封装了每个handler,head为HeadContext,tail为TailContext,其他用户定义的handler会用DefaultChannelHandlerContext封装。

(5)处的group().register(channel)方法是在bossGroup中的selector注册该NioServerSocketChannel,因为bossGroup里聚合了多个NioEventLoop,这里会从多个NioEventLoop中按顺序取出下一个NioEventLoop来注册NioServerSocketChannel。最终调用了SingleThreadEventLoop的register方法,如下:

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}

具体执行了channel.unsafe().register(this, promise)方法,在netty中,channel的所有核心操作都是通过unsafe实现的,即每个channel会有对应的unsafe对象,对于NioServerSocketChannel来说,unsafe为NioMessageUnsafe,其register为其父类AbstractUnsafe的register方法,具体为:

@Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }

        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

最终调用了AbstractUnsafe的register0方法:

private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            /**
             * 将channel注册到selector
             */
            doRegister();(6)
            neverRegistered = false;
            registered = true;

            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            pipeline.invokeHandlerAddedIfNeeded();(7)

            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();(8)
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (isActive()) {
                /**
                 * firstRegistration首次注册标识,只有第一次注册才会传播channel active事件
                 */
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    /**
                     * netty默认是auto read,因此channel active后会触发一次读操作
                     */
                    beginRead();(9)
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

(6)处的doRegister方法为其父类AbstractNioChannel的方法,具体调用了jdk nio的channel注册到selector的方法,获取NioServerSocketChannel对应的jdk的SelectableChannel,将其注册到该NioServerSocketChannel的NioEventLoop的selector上。注册时感兴趣事件为0,即对任何事件都不感兴趣,如下:

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            /**
             * 每个channel会绑定一个eventloop,每个eventloop包含一个selector,用来管理该eventloop管理的每个channel的事件
             * 这里调用了jdk nio的channel注册到selector的方法
             */
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

(7)处的方法执行的是PendingHandlerAddedTask任务,这个任务是在添加初始化用的ChannelIntializer到pipeline时创建的,这个任务具体执行的是ChannelIntializer.handleAdded->ChannelIntializer的私有方法initChannel->ChannelIntializer的子类实现方法initChannel。

代码如下:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        // This should always be true with our current DefaultChannelPipeline implementation.
        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
        // suprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
        // will be added in the expected order.
        initChannel(ctx);
    }
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
             //这里的initChannel为ChannelIntializer的具体实现类的initChannel
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

ChannelIntializer的具体实现类的initChannel方法是在初始化NioServerSocketChannel时通过调用ServerBootstrap的init方法中定义的,如下:

p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            /**
             * 这里的handler为ServerBootstrap.handler传入的handler,一般为LoggingHandler或不设置
             */
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
            // In this case the initChannel(...) method will only be called after this method returns. Because
            // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
            // placed in front of the ServerBootstrapAcceptor.
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });

ChannelIntializer子类的initChannel实现方法是在ServerBootstrap的init方法里定义的,在前面讲了,其主要把真正的LoggingHandler和ServerBootstrapAcceptor添加到NioServerSocketChannel的pipeline。
在ChannelInitializer的私有方法initChannel中,执行了其子类的initChannel方法后,在finally里又执行了remove方法,这个方法把初始化用的ChannelIntializer从pipeline中移除了。完成了初始化,就不需要ChannelInitializer的实现类handler了。
在这时,pipeline中有head context->LoggingHandler context->ServerBootstrapAcceptor context->tail context这个4个context。
(8)处在NioServerSocketChannel的pipeline中传播注册事件,首先调用AbstractChannelHandlerContext的invokeChannelRegistered(final AbstractChannelHandlerContext next)静态方法,如下:

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

最开始,这里的next为HeadContext, 如果在其eventloop线程中,则执行如下方法:

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}
private boolean invokeHandler() {
    // Store in local variable to reduce volatile reads.
    int handlerState = this.handlerState;
    return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}

此时,HeadContext的handlerState为ADD_COMPLETE,因此会执行((ChannelInboundHandler) handler()).channelRegistered(this),即调用了HeadContext的channelRegistered方法,其继续向后传播ctx.fireChannelRegistered(),findContextInbound方法会从head到tail寻找outbound handler context,这里会分别执行HeadContext和LoggingHandler Context的invokeChannelRegistered方法,即对应handler的channelRegistered方法,这里没有什么重要的操作,不再详述,如下:

@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound());
    return this;
}

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

再分析(2)处的doBind0()方法,如下:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                /**
                 * 从tail context到head context找到第一个outbound context,最终调到head context的bind方法,最终调用的是jdk的channel bind
                 */
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

调用链为AbstractBootstrap.doBind0->AbstractChannel.bind->pipeline.bind->(tail.bind,ServerBootstrapAcceptor context.bind,LoggingHandler context.bind,head.bind),tail.bind方法如下:

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (!validatePromise(promise, false)) {
        // cancelled
        return promise;
    }

    /**
     * 从tail向head找到第一个outbound context
     * head为outbound,logginghandler既是inbound又是outbound,channelintializer为inbound,tail为inbound,每次执行context.invokeBind方法,都会执行handler.bind->context.bind,而用户定义的handler的context为DefaultChannelHandlerContext,其bind方法就是其父类AbstractChannelHandlerContext.bind方法,然后就又调到这里来了,继续找下一个outbound context执行invokeBind。这样,最终会调HeadContext.bind方法
     */
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}    

HeadContext.bind方法如下:

@Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }

其调用了unsafe.bind方法,对于NioServerSocketChannel,unsafe为NioMessageUnsafe,bind方法为其父类AbstractUnsafe的bind方法,如下:

@Override
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();

        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }

        // See: https://github.com/netty/netty/issues/576
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
            localAddress instanceof InetSocketAddress &&
            !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
            !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
            // Warn a user about the fact that a non-root user can't receive a
            // broadcast packet on *nix if the socket is bound on non-wildcard address.
            logger.warn(
                    "A non-root user can't receive a broadcast packet if the socket " +
                    "is not bound to a wildcard address; binding to a non-wildcard " +
                    "address (" + localAddress + ") anyway as requested.");
        }

        boolean wasActive = isActive();
        try {
            doBind(localAddress);(9)
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }

        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }

        safeSetSuccess(promise);
    }

(9)处会调用NioServerSocketChannel的doBind方法,最终执行的事jdk的channel绑定到指定端口,到这一步,server开始监听client的连接了:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

在NioServerSocketChannel所在的NioEventLoop中,当有accept事件产生时,会调用NioServerSocketChannel的unsafe.read()方法,即AbstractNioMessageChannel的NioMessageUnsafe的read方法,如下:

private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // ChannelConfig.setAutoRead(false) was called in the meantime
            removeReadOp();
            return;
        }

        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        final ChannelPipeline pipeline = pipeline();
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                for (;;) {
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    // stop reading and remove op
                    if (!config.isAutoRead()) {
                        break;
                    }

                    if (readBuf.size() >= maxMessagesPerRead) {
                        break;
                    }
                }
            } catch (Throwable t) {
                exception = t;
            }
            setReadPending(false);
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                pipeline.fireChannelRead(readBuf.get(i));
            }

            readBuf.clear();
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                closed = closeOnReadError(exception);

                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
}

里面调用了NioServerSocketChannel的doReadMessages方法,并且之后又对buf数组中每个buf执行了pipeline.fireChannelRead(readBuf.get(i)),即对每个buf触发了pipeline的读事件,下面在doReadMessages方法中看看这个buf数组到底是啥:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

该方法创建了SocketChannel,并将该ch封装成NioSocketChannel放在了buf中,因此是将每个NioSocketChannel传入了pipeline的读事件中。因为现在NioServerSocketChannel的pipeline中只有ServerBootstrapAcceptor这个handler了,看看这个handler的channelRead方法执行了啥:

@Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        child.pipeline().addLast(childHandler);

        for (Entry<ChannelOption<?>, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

这个方法获得了传递过来的NioSocketChannel,并初始化了这个NioSocketChannel的pipeline为用户传入的childHandler,然后将该NioSocketChannel注册到了work NioEventLoopGroup中。
在注册该NioSocketChannel之前,该NioSocketChannel对应的pipeline为:
HeadContext->childhandler context(通过ServerBootstrap.childHandler传入的ChannelIntializer对应的context)->TailContext。

childGroup.register(child)这个方法的调用链为NioEventLoopGroup.register->MultithreadEventLoopGroup.register->SingleThreadEventLoop.register->NioByteUnsafe.register->AbstractUnsafe.register方法->AbstractUnsafe.register0,AbstractUnsafe.register0方法如下:

    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            /**
             * 将channel注册到selector
             */
            doRegister();
            neverRegistered = false;
            registered = true;

            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            /**
             * 1.对于NioServerSocketChannel
             *      在NioServerSocketChannel首次注册到selector后,会执行该channel对应的pipeline的invokeHandlerAddedIfNeeded方法
             *      这个方法会把通过ServerBootstrap.handler设置的LoggingHandler和ServerBootstrapAcceptort封装成DefaultChannelHandlerContext加入到pipeline中,
             *      并移除初始化用的ChannelIntializer。
             *      最终执行的是ChannelIntializer的实现类的initChannel方法。
             * 2.对于NioSocketChannel
             *      在NioSocketChannel首次注册到selector后,会执行该channel对应的pipeline的invokeHandlerAddedIfNeeded方法
             *      这个方法会把通过ServerBootstrap.childHandler设置的业务handler封装成DefaultChannelHandlerContext加入到pipeline中,
             *      并移除初始化用的ChannelIntializer。
             *      最终执行的是ChannelIntializer的实现类的initChannel方法。
             */
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            /**
             * 针对server端
             *      对于NioServerSocketChannel,首次注册时因为还没绑定,所以isActive为false
             *      对于NioSocketChannel,首次注册时说明已经与该client建立好连接了,所以isActive为true
             * 针对client端
             *      对于NioSocketChannel,首次注册时还没有与server建立好连接,所以isActive为false
             */
            if (isActive()) {
                /**
                 * firstRegistration首次注册标识,只有第一次注册才会传播channel active事件
                 */
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    /**
                     * netty默认是auto read,因此channel active后会触发一次读操作
                     */
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

与NioServerSocketChannel不同,在NioSocketChannel首次注册到selector后,会执行pipeline.fireChannelActive()方法。最终会调用HeadContext的channelActive方法,如下:

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();

        readIfIsAutoRead();
    }
    
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

然后会调用到channel.read方法,channel为NioSocketChannel,调用链为NioSocketChannel.read->pipeline.read->tail.read,tail.read如下:

@Override
public ChannelHandlerContext read() {
    /**
     * 从tail(inbound)->channelintializer(inbound)->head(outbound)找到的第1个outbound为HeadContext
     */
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeRead();
    } else {
        Runnable task = next.invokeReadTask;
        if (task == null) {
            next.invokeReadTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeRead();
                }
            };
        }
        executor.execute(task);
    }

    return this;
}

HeadContext的read方法如下:

    @Override
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

其调用了AbstractUnsafe的beginRead,如:

    @Override
    public final void beginRead() {
        assertEventLoop();

        if (!isActive()) {
            return;
        }

        try {
            doBeginRead();
        } catch (final Exception e) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireExceptionCaught(e);
                }
            });
            close(voidPromise());
        }
    }

最终调用了AbstractNioChannel的doBeginRead方法,具体执行了为该NioSocketChannel的selectionKey注册读事件,这样就能在client发送来数据后能感知到了:

/**
 * 当channel活跃时和读数据完成时会调用该方法
 * 注册感兴趣事件,对于NioServerSocketChannel,readInterestOp是accept事件,对于NioSocketChannel,readInterestOp是read事件
 */
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    if (inputShutdown) {
        return;
    }

    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读