1_Netty源码分析之Netty服务端启动
本文均为原创,如需转载请注明出处。
[TOC]
Netty服务端创建流程分析
Netty为了向使用者屏蔽NIO
通信的底层细节,在和用户交互的边界做了封装,母的就是为了减少用户开发工作量,降低开发难度。Bootstrap
是Socket
客户端创建工具类,用户听过Bootstrap
可以方柏霓地创建Netty
地客户端并发起异步TCP
连接操作。
Netty服务端--Channel的创建
首先基于NIO
的学习,思考两个问题
-
服务端的
Socket
在哪里初始化? -
在哪里accept连接?
研究服务端是如何创建的,查看源码,首先应该从源头出发,直接调用的方法开始层层深入。
Netty服务端的入口bind()方法
服务端创建的入口bind()
方法
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
进入bind()
方法发现会执行一个dobind()
方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
该方法调用了一个initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
很明显可以看出。这里会创建一个底层的channel
,调用Netty
维护的一个工厂类,下面的init()
方法是一个抽象类,可以看看那些方法继承并实现了它呢?😊
所以可想而知!在调用bind()
方法后,Netty
会调用JDK
底层初始化一个Channel
,回过头来看看channelFactory.newChannel()
的实现。
在ChannelFactory
下有一个利用反射的实现子类ReflectiveChannelFactory()
,从名字就可以看得出来,是一个利用反射来初始化channel
的工厂类。该类下的newChannel()
方法:
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
确实是利用了返回来进行实例化的。那么它是如何知道实话化那个对象呢?
让我们回到上一层开始出现channelFactory.newChannel()
,在这里可以很清楚的看出来,当前类AbstractBootstrap
自身维护着该工厂对象,并且在构造函数中给该工厂类所需要的对象进行了赋值。
AbstractBootstrap() {
// Disallow extending from a different package.
}
AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
group = bootstrap.group;
// 那么这里的bootstrap对象又是从哪里得到的呢?
channelFactory = bootstrap.channelFactory;
handler = bootstrap.handler;
localAddress = bootstrap.localAddress;
synchronized (bootstrap.options) {
options.putAll(bootstrap.options);
}
synchronized (bootstrap.attrs) {
attrs.putAll(bootstrap.attrs);
}
}
这里的AbstractBootstrap<B, C> bootstrap
很直接的就可以想到我们在最外层定义的AbstractBootstrap
的子类Bootstrap/ServerBootstrap
中设置了
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
其中设置的channel
属性就是告诉底层的channelFactory
来实例化该对象。
这里我们验证一下,直接从我们的代码中.channel()
方法中去,可以很清楚的看出,上面说的调用的是ChannelFactory
下的子类ReflectiveChannelFactory
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
// 调用工厂类,利用反射创建对象
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
该类的实现非常的简单:
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
// 直接返回 需要初始化类的 class 对象
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
// 利用反射 初始化返回实例对象
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
现在明白了channel
是由谁创建的,那么到底是怎么创建出来的呢?现在进入NioServerSocketChannel
.查看该对象的构造函数,做了那些事情。
通过NioServerSocketChannel
加密Channel的创建
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 该方法 返回的对象为 java.nio.channels;下的。调用的方法也是JDK底层的实现
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
// 原来NioServerSocketChannel 是直接调用的JDK底层的newSocket来来创建Channel 通道
其中还有一个构造
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
// 这里是可以生成一个NiO channel配置信息
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
一次点进该方法的父类可以找到在NIO
编程中不可避免地一项异步配置:ch.configureBlocking(false);
,配置为异步非阻塞。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);// 配置为异步 非阻塞 (重点)
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
Channel UML图.png
在上面继承关系中AbstractChannel
维护着channel
通道地内部属性
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId(); // Channel的 唯一标识
unsafe = newUnsafe();// 对应 底层TCP读写的相关操作
pipeline = newChannelPipeline();// 后续研究😊
}
在服务端channel
初始化完成之后,下一步就需要将该channel
注册到selector
上面。
注册selector
在上述代码channel
初始化完成的地方为:调用了init()
方法。因此应该从该地方出发查看如何将channel
注册到selector
上。initAndRegister
初始并注册。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 初始化操作
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
其中register
的实现在AbstractChannel
下。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 绑定线程,简单的赋值操作
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 实现注册
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
register0(promise);
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 调用Jdk底层注册
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 触发事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 传播时间
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 调用JDK 方法实现注册
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
注册成功后,考虑端口绑定。
端口绑定
还是根据dobind()
方法可以看到里面有个bind0()
方法。channel对象调用bind()
方法,在AbstractChannel()
下有具体实现
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 调用JDK底层 绑定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
调用底层的绑定方法
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用底层JDK的绑定方法
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
总结
服务端启动核心路径总结:
服务端启动核心路径.png首先调用服务端的newChannel()
创建服务端channel
,这个过程实际上就是调用JDK
底层的API
来创建一个JDK channel
,然后Netty
将其包装成自己的服务端的channel
,同时会创建一些基本的组件绑定在此channel
上(比喻:pipeline
)。然后调用init()
来初始化服务端channel
,这个过程最重要的就是为服务端的channel
添加一个连接处理器。随后调用register()
方法注册selector
,这个过程Netty
将JDK
底层生成的channel
注册到selector
上,最后调用bind()
方法通过jdk
底层的API
将端口号绑定。来实现,绑定之后,netty
会selector
绑定一个OP_ACCEPT
事件,然后selector
就可以接收绑定其他channel
了。