1_Netty源码分析之Netty服务端启动

2019-03-10  本文已影响0人  小安的大情调

本文均为原创,如需转载请注明出处。

[TOC]

Netty服务端创建流程分析

​ Netty为了向使用者屏蔽NIO通信的底层细节,在和用户交互的边界做了封装,母的就是为了减少用户开发工作量,降低开发难度。BootstrapSocket客户端创建工具类,用户听过Bootstrap可以方柏霓地创建Netty地客户端并发起异步TCP连接操作。

Netty服务端时序图

Netty服务端--Channel的创建

首先基于NIO的学习,思考两个问题

研究服务端是如何创建的,查看源码,首先应该从源头出发,直接调用的方法开始层层深入。

Netty服务端的入口bind()方法

​ 服务端创建的入口bind()方法

// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

进入bind()方法发现会执行一个dobind()方法

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

该方法调用了一个initAndRegister()

 final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

很明显可以看出。这里会创建一个底层的channel,调用Netty维护的一个工厂类,下面的init()方法是一个抽象类,可以看看那些方法继承并实现了它呢?😊

AbstractBootstrap下的两个子类.png

所以可想而知!在调用bind()方法后,Netty会调用JDK底层初始化一个Channel,回过头来看看channelFactory.newChannel()的实现。

ChannelFactory下有一个利用反射的实现子类ReflectiveChannelFactory(),从名字就可以看得出来,是一个利用反射来初始化channel的工厂类。该类下的newChannel()方法:

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

确实是利用了返回来进行实例化的。那么它是如何知道实话化那个对象呢?

​ 让我们回到上一层开始出现channelFactory.newChannel(),在这里可以很清楚的看出来,当前类AbstractBootstrap自身维护着该工厂对象,并且在构造函数中给该工厂类所需要的对象进行了赋值。

    AbstractBootstrap() {
        // Disallow extending from a different package.
    }

    AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
        group = bootstrap.group;
        // 那么这里的bootstrap对象又是从哪里得到的呢?
        channelFactory = bootstrap.channelFactory;
        handler = bootstrap.handler;
        localAddress = bootstrap.localAddress;
        synchronized (bootstrap.options) {
            options.putAll(bootstrap.options);
        }
        synchronized (bootstrap.attrs) {
            attrs.putAll(bootstrap.attrs);
        }
    }

这里的AbstractBootstrap<B, C> bootstrap很直接的就可以想到我们在最外层定义的AbstractBootstrap的子类Bootstrap/ServerBootstrap中设置了

          ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

其中设置的channel属性就是告诉底层的channelFactory来实例化该对象。

这里我们验证一下,直接从我们的代码中.channel()方法中去,可以很清楚的看出,上面说的调用的是ChannelFactory下的子类ReflectiveChannelFactory

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        // 调用工厂类,利用反射创建对象
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

该类的实现非常的简单:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    // 直接返回 需要初始化类的 class 对象
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    // 利用反射 初始化返回实例对象
    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

现在明白了channel是由谁创建的,那么到底是怎么创建出来的呢?现在进入NioServerSocketChannel.查看该对象的构造函数,做了那些事情。

通过NioServerSocketChannel加密Channel的创建

    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
// 该方法 返回的对象为 java.nio.channels;下的。调用的方法也是JDK底层的实现
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
// 原来NioServerSocketChannel 是直接调用的JDK底层的newSocket来来创建Channel 通道

其中还有一个构造

    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 这里是可以生成一个NiO channel配置信息
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

一次点进该方法的父类可以找到在NIO编程中不可避免地一项异步配置:ch.configureBlocking(false);,配置为异步非阻塞。

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);// 配置为异步 非阻塞 (重点)
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
Channel UML图.png

在上面继承关系中AbstractChannel维护着channel通道地内部属性


    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId(); // Channel的 唯一标识
        unsafe = newUnsafe();// 对应 底层TCP读写的相关操作
        pipeline = newChannelPipeline();// 后续研究😊
    }

在服务端channel初始化完成之后,下一步就需要将该channel注册到selector上面。

注册selector

​ 在上述代码channel初始化完成的地方为:调用了init()方法。因此应该从该地方出发查看如何将channel注册到selector上。initAndRegister初始并注册。


    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 初始化操作
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        // 注册channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

其中register的实现在AbstractChannel下。

 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            // 绑定线程,简单的赋值操作
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            // 实现注册
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

register0(promise);

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 调用Jdk底层注册
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                // 触发事件
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                // 传播时间
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 调用JDK 方法实现注册
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;

注册成功后,考虑端口绑定。

端口绑定

还是根据dobind()方法可以看到里面有个bind0()方法。channel对象调用bind()方法,在AbstractChannel()下有具体实现

@Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 调用JDK底层 绑定
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        // 触发事件
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

调用底层的绑定方法

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            // 调用底层JDK的绑定方法
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

总结

​ 服务端启动核心路径总结:

服务端启动核心路径.png

首先调用服务端的newChannel()创建服务端channel,这个过程实际上就是调用JDK底层的API来创建一个JDK channel,然后Netty将其包装成自己的服务端的channel,同时会创建一些基本的组件绑定在此channel上(比喻:pipeline)。然后调用init()来初始化服务端channel,这个过程最重要的就是为服务端的channel添加一个连接处理器。随后调用register()方法注册selector,这个过程NettyJDK底层生成的channel注册到selector上,最后调用bind()方法通过jdk底层的API将端口号绑定。来实现,绑定之后,nettyselector绑定一个OP_ACCEPT事件,然后selector就可以接收绑定其他channel了。

上一篇下一篇

猜你喜欢

热点阅读