Netty源码分析系列

Netty源码分析系列--10. Channel注册到Event

2018-11-06  本文已影响44人  ted005

Channel的注册到EventLoop

前文中介绍了服务端ServerBootStrap绑定端口号时,很重要的一个方法是initAndRegister,当Channel初始化完成后,会进行注册

ChannelFuture regFuture = config().group().register(channel);
@Override
public EventLoop next() {
    return (EventLoop) super.next();
}

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

进入子类SingleThreadEventLoop

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

可以看到,Channel被包装为ChannelPromise,它持有ChannelEventLoop
最终进入AbstractUnsaferegister方法:

private void register0(ChannelPromise promise) {
        try {
            
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            
            //1.
            doRegister();
            neverRegistered = false;
            registered = true;

            //2.
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            
            //3.
            pipeline.fireChannelRegistered();
            
            if (isActive()) {
                if (firstRegistration) {
                    //4.
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
  1. AbstractUnsafe中的doRegister()方法为空,进入它的子类AbstractNioChannel:
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // java NIO
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}
  1. 注册完成后,依次调用Handler的回调方法。

Channel、EventLoop的关系

如下Channel注册时的代码示例:

if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
        logger.warn(
                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                AbstractChannel.this, t);
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

当方法inEventLoop()返回true时,直接执行调用register0,否则作为任务Task提交到EventLoop中稍后执行。

方法inEventLoop()的实现在NioEventLoop的父类SingleThreadEventExecutor中,如下:

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

SingleThreadEventExecutor的父类AbstractEventExecutor把当前线程对象传入:

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

综上可以看到,inEventLoop()的逻辑就是判断当前线程是否是NioEventLoopSingleThreadEventExecutor)所持有的那个Thread对象。如果不是,就提交给NioEventLoop,让任务稍后由持有的那个Thread对象执行。

上一篇 下一篇

猜你喜欢

热点阅读