NettyServer源码分析
可以先看下NettyServer启动流程分析
以EchoServer为例,main方法如下:
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
ServerBootstrap通过group方法聚合了一个bossGroup和一个workerGroup,他们都是NioEventLoopGroup实例。
当执行ServerBootstrap的bind方法时,实际上执行的是其父类AbstractBootstrap的bind方法,其中调用了其doBind方法,如下:
//AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
/**
* 创建,初始化channel,将channel注册到selector
*/
final ChannelFuture regFuture = initAndRegister();//(1)
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);//(2)
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
主要是(1)(2)两处的代码,先分析(1)处的initAndRegister方法,如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
* 创建channel对象,对于server端为NioServerSocketChannel
*/
channel = channelFactory().newChannel();(3)
init(channel);(4)
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
/**
* 在boss EventLoopGroup中注册该channel
* 从boss EventLoopGroup中选出一个EventLoop注册该channel ->SingleThreadEventLoop.register -> channel.unsafe().register
*
*/
ChannelFuture regFuture = group().register(channel);(5)
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
(3)处会创建channel,因为在ServerBootstrap中设置了channel(NioServerSocketChannel.class),因此这里创建的channel为NioServerSocketChannel类型。
(4)用于初始化该NioServerSocketChannel,调用了其子类ServerBootstrap.init(Channel channel)方法,如下:
/**
* 该方法用于初始化创建的channel对象,channelfactory创建的channel
*/
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
/**
* channel为NioServerSocketChannel,pipeline为其父类AbstractChannel的成员,为DefaultChannelPipeline类型。
* DefaultChannelPipeline包含一个HeadContext和一个TailContext
*/
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
/**
* 该childHandler一般为用户初始化ServerBootstrap时传入的ChannelInitializer
*/
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
/**
* 对于NioServerSocketChannel,最初只有一个LoggingHandler或没有,这里添加一个初始化handler(ChannelInitializer)
* 当触发ChannelInitializer这个handler注册事件时,会执行initChannel方法,再添加一个handler(ServerBootstrapAcceptor),这个handler用于将对于NioSocketChannel注册到work eventloop。
*/
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
/**
* 这里的handler为ServerBootstrap.handler传入的handler,一般为LoggingHandler或不设置
*/
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
主要初始化了NioServerSocketChannel的childHandler和pipeline,childHandler为用户传入的ChannelInitializer,在初始化NioServerSocketChannel时会在pipeline中加入一个ChannelInitializer,ChannelInitializer的initChannel方法为向pipeline加入一个ServerBootstrapAcceptor。
到这一步,NioServerSocketChannel的pipeline中包含3个handler,分别是:
head->ChannelInitializer->tail,但实际上,在netty中,用context封装了每个handler,head为HeadContext,tail为TailContext,其他用户定义的handler会用DefaultChannelHandlerContext封装。
(5)处的group().register(channel)方法是在bossGroup中的selector注册该NioServerSocketChannel,因为bossGroup里聚合了多个NioEventLoop,这里会从多个NioEventLoop中按顺序取出下一个NioEventLoop来注册NioServerSocketChannel。最终调用了SingleThreadEventLoop的register方法,如下:
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
具体执行了channel.unsafe().register(this, promise)方法,在netty中,channel的所有核心操作都是通过unsafe实现的,即每个channel会有对应的unsafe对象,对于NioServerSocketChannel来说,unsafe为NioMessageUnsafe,其register为其父类AbstractUnsafe的register方法,具体为:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
最终调用了AbstractUnsafe的register0方法:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
/**
* 将channel注册到selector
*/
doRegister();(6)
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();(7)
safeSetSuccess(promise);
pipeline.fireChannelRegistered();(8)
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
/**
* firstRegistration首次注册标识,只有第一次注册才会传播channel active事件
*/
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
/**
* netty默认是auto read,因此channel active后会触发一次读操作
*/
beginRead();(9)
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
(6)处的doRegister方法为其父类AbstractNioChannel的方法,具体调用了jdk nio的channel注册到selector的方法,获取NioServerSocketChannel对应的jdk的SelectableChannel,将其注册到该NioServerSocketChannel的NioEventLoop的selector上。注册时感兴趣事件为0,即对任何事件都不感兴趣,如下:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
/**
* 每个channel会绑定一个eventloop,每个eventloop包含一个selector,用来管理该eventloop管理的每个channel的事件
* 这里调用了jdk nio的channel注册到selector的方法
*/
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
(7)处的方法执行的是PendingHandlerAddedTask任务,这个任务是在添加初始化用的ChannelIntializer到pipeline时创建的,这个任务具体执行的是ChannelIntializer.handleAdded->ChannelIntializer的私有方法initChannel->ChannelIntializer的子类实现方法initChannel。
代码如下:
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// suprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
initChannel(ctx);
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
//这里的initChannel为ChannelIntializer的具体实现类的initChannel
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
ChannelIntializer的具体实现类的initChannel方法是在初始化NioServerSocketChannel时通过调用ServerBootstrap的init方法中定义的,如下:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
/**
* 这里的handler为ServerBootstrap.handler传入的handler,一般为LoggingHandler或不设置
*/
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
ChannelIntializer子类的initChannel实现方法是在ServerBootstrap的init方法里定义的,在前面讲了,其主要把真正的LoggingHandler和ServerBootstrapAcceptor添加到NioServerSocketChannel的pipeline。
在ChannelInitializer的私有方法initChannel中,执行了其子类的initChannel方法后,在finally里又执行了remove方法,这个方法把初始化用的ChannelIntializer从pipeline中移除了。完成了初始化,就不需要ChannelInitializer的实现类handler了。
在这时,pipeline中有head context->LoggingHandler context->ServerBootstrapAcceptor context->tail context这个4个context。
(8)处在NioServerSocketChannel的pipeline中传播注册事件,首先调用AbstractChannelHandlerContext的invokeChannelRegistered(final AbstractChannelHandlerContext next)静态方法,如下:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
最开始,这里的next为HeadContext, 如果在其eventloop线程中,则执行如下方法:
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
此时,HeadContext的handlerState为ADD_COMPLETE,因此会执行((ChannelInboundHandler) handler()).channelRegistered(this),即调用了HeadContext的channelRegistered方法,其继续向后传播ctx.fireChannelRegistered(),findContextInbound方法会从head到tail寻找outbound handler context,这里会分别执行HeadContext和LoggingHandler Context的invokeChannelRegistered方法,即对应handler的channelRegistered方法,这里没有什么重要的操作,不再详述,如下:
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
再分析(2)处的doBind0()方法,如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
/**
* 从tail context到head context找到第一个outbound context,最终调到head context的bind方法,最终调用的是jdk的channel bind
*/
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
调用链为AbstractBootstrap.doBind0->AbstractChannel.bind->pipeline.bind->(tail.bind,ServerBootstrapAcceptor context.bind,LoggingHandler context.bind,head.bind),tail.bind方法如下:
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
/**
* 从tail向head找到第一个outbound context
* head为outbound,logginghandler既是inbound又是outbound,channelintializer为inbound,tail为inbound,每次执行context.invokeBind方法,都会执行handler.bind->context.bind,而用户定义的handler的context为DefaultChannelHandlerContext,其bind方法就是其父类AbstractChannelHandlerContext.bind方法,然后就又调到这里来了,继续找下一个outbound context执行invokeBind。这样,最终会调HeadContext.bind方法
*/
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
HeadContext.bind方法如下:
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
其调用了unsafe.bind方法,对于NioServerSocketChannel,unsafe为NioMessageUnsafe,bind方法为其父类AbstractUnsafe的bind方法,如下:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);(9)
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
(9)处会调用NioServerSocketChannel的doBind方法,最终执行的事jdk的channel绑定到指定端口,到这一步,server开始监听client的连接了:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
在NioServerSocketChannel所在的NioEventLoop中,当有accept事件产生时,会调用NioServerSocketChannel的unsafe.read()方法,即AbstractNioMessageChannel的NioMessageUnsafe的read方法,如下:
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
}
里面调用了NioServerSocketChannel的doReadMessages方法,并且之后又对buf数组中每个buf执行了pipeline.fireChannelRead(readBuf.get(i)),即对每个buf触发了pipeline的读事件,下面在doReadMessages方法中看看这个buf数组到底是啥:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
该方法创建了SocketChannel,并将该ch封装成NioSocketChannel放在了buf中,因此是将每个NioSocketChannel传入了pipeline的读事件中。因为现在NioServerSocketChannel的pipeline中只有ServerBootstrapAcceptor这个handler了,看看这个handler的channelRead方法执行了啥:
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
这个方法获得了传递过来的NioSocketChannel,并初始化了这个NioSocketChannel的pipeline为用户传入的childHandler,然后将该NioSocketChannel注册到了work NioEventLoopGroup中。
在注册该NioSocketChannel之前,该NioSocketChannel对应的pipeline为:
HeadContext->childhandler context(通过ServerBootstrap.childHandler传入的ChannelIntializer对应的context)->TailContext。
childGroup.register(child)这个方法的调用链为NioEventLoopGroup.register->MultithreadEventLoopGroup.register->SingleThreadEventLoop.register->NioByteUnsafe.register->AbstractUnsafe.register方法->AbstractUnsafe.register0,AbstractUnsafe.register0方法如下:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
/**
* 将channel注册到selector
*/
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
/**
* 1.对于NioServerSocketChannel
* 在NioServerSocketChannel首次注册到selector后,会执行该channel对应的pipeline的invokeHandlerAddedIfNeeded方法
* 这个方法会把通过ServerBootstrap.handler设置的LoggingHandler和ServerBootstrapAcceptort封装成DefaultChannelHandlerContext加入到pipeline中,
* 并移除初始化用的ChannelIntializer。
* 最终执行的是ChannelIntializer的实现类的initChannel方法。
* 2.对于NioSocketChannel
* 在NioSocketChannel首次注册到selector后,会执行该channel对应的pipeline的invokeHandlerAddedIfNeeded方法
* 这个方法会把通过ServerBootstrap.childHandler设置的业务handler封装成DefaultChannelHandlerContext加入到pipeline中,
* 并移除初始化用的ChannelIntializer。
* 最终执行的是ChannelIntializer的实现类的initChannel方法。
*/
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
/**
* 针对server端
* 对于NioServerSocketChannel,首次注册时因为还没绑定,所以isActive为false
* 对于NioSocketChannel,首次注册时说明已经与该client建立好连接了,所以isActive为true
* 针对client端
* 对于NioSocketChannel,首次注册时还没有与server建立好连接,所以isActive为false
*/
if (isActive()) {
/**
* firstRegistration首次注册标识,只有第一次注册才会传播channel active事件
*/
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
/**
* netty默认是auto read,因此channel active后会触发一次读操作
*/
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
与NioServerSocketChannel不同,在NioSocketChannel首次注册到selector后,会执行pipeline.fireChannelActive()方法。最终会调用HeadContext的channelActive方法,如下:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
然后会调用到channel.read方法,channel为NioSocketChannel,调用链为NioSocketChannel.read->pipeline.read->tail.read,tail.read如下:
@Override
public ChannelHandlerContext read() {
/**
* 从tail(inbound)->channelintializer(inbound)->head(outbound)找到的第1个outbound为HeadContext
*/
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
HeadContext的read方法如下:
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
其调用了AbstractUnsafe的beginRead,如:
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
最终调用了AbstractNioChannel的doBeginRead方法,具体执行了为该NioSocketChannel的selectionKey注册读事件,这样就能在client发送来数据后能感知到了:
/**
* 当channel活跃时和读数据完成时会调用该方法
* 注册感兴趣事件,对于NioServerSocketChannel,readInterestOp是accept事件,对于NioSocketChannel,readInterestOp是read事件
*/
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}