Java 杂谈深入浅出Netty源码剖析

Netty源码阅读——服务器端的启动过程

2018-12-27  本文已影响0人  曾泽浩

笔者个人理解,如有错误之后,请指正。

一个简单的例子,不过是不可以运行的,少了一些类哈

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在分析源码之前,我们可以尝试思考几个问题,带着问题去看源码,才有目的性,才会有一种柳岸花明的感觉。

  1. 首先,netty是对原生NIO的一层封装,那netty是如何实现对原生NIO的封装的
  2. 第二,服务器是如何不断处理客户端的请求的
  3. 第三,bossGroup和workerGroup是如何分工的

入口ServerBootStrap的bind()

AbstractBootstrap#bind()

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

AbstractBootstrap#doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化并且注册
    // 重点关注initAndRegister()
    final ChannelFuture regFuture = initAndRegister();
    // 拿到Channel
    final Channel channel = regFuture.channel();
    // cause()不为空,表示出错l
    if (regFuture.cause() != null) {
        return regFuture;
    }
    // 已经成功
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

AbstractBootstrap#initAndRegister()

final ChannelFuture initAndRegister() {
    // 对于ServerBootstrap,对应的是NioServerSocketChannel
    Channel channel = null;
    try {
        // 实例化Channel对象
        channel = channelFactory.newChannel();
        // 初始化
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 注册Channel
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    return regFuture;
}

上面有一步很关键的代码

// 实例化Channel对象
channel = channelFactory.newChannel();

首先看一下channelFactory,它是一个Channel工厂,它是什么时候被赋值的呢?

AbstractBootstrap#channel()

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

在我们指定Channel的时候,会调用这个方法,服务器启动时传进来的是NioServerSocketChannel.class。

看一下ReflectiveChannelFactory这个类,是一个反射的Channel工厂,通过传进来的Class来创建对应的Channel,还是比较容易理解的。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

看回去channelFactory()方法

public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
    return channelFactory((ChannelFactory<C>) channelFactory);
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }
    //赋值,ChannelFactory
    this.channelFactory = channelFactory;
    return self();
}

上面就完成了对channelFactory的赋值,而且这个channelFactory是用来创建NioServerSocketChannel实例的。

知道了channelFactory是怎么赋值的,是用来干嘛的,接着回去。

// 实例化Channel对象
channel = channelFactory.newChannel();

上面这句话的意思就是创建一个NioServerSocketChannel的实例,那自然会调用它的构造方法,那就接下去看看NioServerSocketChannel的构造方法。

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
 * 创建一个新的ServerSocketChannel
 * @param provider
 * @return
 */
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

这一步,跟JDK NIO一样,需要创建一个ServerSocketChannel。

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 监听客户端连接
    super(null, channel, SelectionKey.OP_ACCEPT);
    // 配置
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

接着看父类的构造函数

AbstractNioMessageChannel的构造函数

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

继续往下看

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // 这里的parent是为null
    super(parent);
    // 这里ch是ServerSocketChannel
    this.ch = ch;
    // readInterestOp是子类的传过来SelectionKey.OP_ACCEPT
    // 表示这个ch对客户端的连接事件有兴趣
    this.readInterestOp = readInterestOp;
    try {
        // 设置为非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

总结一下,从上面channelFactory.newChannel()的过程中,会去创建一个ServerSocketChannel(这与JDK NIO的ServerSocketChannel是一样的),然后还有准备监听SelectionKey.OP_ACCEPT,不过从构造函数看来还没开始注册监听事件。

继续回去看看,回到AbstractBootstrap#initAndRegister()的这个方法

init(channel);

对channel进行初始化,init()是一个抽象方法,由它的子类是实现。

这里对应的子类是ServerBootstrap

@Override
void init(Channel channel) throws Exception {
    // 所有的选项
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 所有的属性
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    // 管道
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    // 添加管道处理
    // 这一步并不是马上执行
    // 因为执行addLast方法时,会被判断这个Channel有没有被注册
    // 如果没有被注册的话,则会等到注册时再回调这个方法
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 处理器
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // 增加一个ServerBootstrapAcceptor
            // 这里用来处理新的客户端连接
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

上面的方法中,有ServerBootstrapAcceptor处理类,等会再回来看看。

接着又看回去initAndRegister()

final ChannelFuture initAndRegister() {
    // 对于ServerBootstrap,对应的是NioServerSocketChannel
    Channel channel = null;
    try {
        // 实例化Channel对象
        channel = channelFactory.newChannel();
        // 初始化
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 注册Channel
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

下面执行到这一步

// 注册Channel
ChannelFuture regFuture = config().group().register(channel);

这个config().group()其实就是一开始传进来的bossGroup,看看register()方法

这里最终会调用SingleThreadEventLoop的register()方法

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

最后调用AbstractUnsafe这个类

//注册
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 判断EventLoop不能为空
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    // 判断是否已经注册过
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
    // 赋值
    // 对于NioServerSocketChannel来说,这个才给eventLoop赋值
    // 指定这个channel对应的eventLoop
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 这里就会开启一个新的线程
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    // 接着继续看这里
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

register0()

//注册
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        // 第一次注册
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        // 已经注册
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        
        // 这一步,会将之前addLast,但是没有添加进来的Handler,重新添加进来
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);

        // 回调fireChannelRegistered() Channel已经被注册
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // 服务器刚开始启动时不会进来
        if (isActive()) {
            // 如果是第一次注册
            if (firstRegistration) {
                // 回调
                // 活跃状态
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

上面的操作时注册Channel,这个时候会去启动一个线程,这里的eventLoop是NioEventLoopGroup,这里执行execute()方法时,回去调用NioEventLoop的run()方法

/**
 * 线程启动的时候会执行这个方法
 */
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    // 处理SelectedKeys
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // 运行所有任务
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    // 处理SelectedKeys
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

这里的run()方法先不细说,回头再看看。

run()方法主要做了3件事情:

  1. 轮询注册到selector的所有的channel的I/O事件

    select();
    
  2. 处理产生网络I/O事件的channel

    processSelectedKeys();
    
  3. 运行所有任务

runAllTasks();

到这里,我们就分析玩了initAndRegister()这个方法,然后现在继续返回看doBind()方法吧。

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化并且注册
    final ChannelFuture regFuture = initAndRegister();
    // 拿到Channel
    final Channel channel = regFuture.channel();
    // cause()不为空,表示出错l
    if (regFuture.cause() != null) {
        return regFuture;
    }
    // 已经成功
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

接着看doBind0()方法

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    //
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            //
            if (regFuture.isSuccess()) {
                // 绑定
                // 添加监听
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

最终会调用AbstractUnsafe类的bind()方法,其实所有的网络I/O操作最终都是由Unsafe类去完成的。

// 绑定
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {

    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    // 激活状态
    boolean wasActive = isActive();
    try {
        // 绑定处理
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

接着NioServerSocketChannel的doBind()方法。

上面还有一步操作pipeline.fireChannelActive(),通知channel处于激活状态,

跟踪进去。

DefaultChannelPipeline的fireChannelActive()

@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}
private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelActive();
    }
}

DefaultChannelPipeline的channelActive()

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}

跟踪进来这么久,发现了一个比较有用的东西

readIfIsAutoRead();

那就接着进去看看呗

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

默认是自动读的,又看到channel去调用网络I/O操作,那最终还是交给UnSafe类去完成的

果不其然,最终调用了DefaultChannelPipeline的read()方法

然后是AbstractUnSafe类的beginRead()方法

@Override
public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
        return;
    }

    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 捕获异常
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

然后再调用AbsrtactNioChannel的doBeginRead()方法

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // 等于0 说明没有注册读事件
    if ((interestOps & readInterestOp) == 0) {
        // 监听网络事件
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

还记得readInterestOp吗?在最开始的时候,创建NioServerSocketChannel实例的时候,传进去了SelectionKey.OP_ACCEPT,所以这个时候channel就会监听客户端的连接事件了。

selectionKey.interestOps(interestOps | readInterestOp);

总结一下上面的分析,上面的分析涵盖了原生NIO的使用,创建ServerSocketChannel,绑定端口,注册监听事件,轮询I/O事件,处理I/O事件等等。

记得上面提到的ServerBootstrapAcceptor类吗?

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 这里的Channel是NioSocketChannel
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // childGroup是workerGroup
        // 这里会把channel交给workerGroup处理
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

呀呀呀呀,感觉讲得不是很好,还有一些细节的东西没有讲到。

有时候看懂代码了,但是讲不清楚,但是需要努力呀。

上一篇下一篇

猜你喜欢

热点阅读