Netty Server端启动之源码分析

2019-05-09  本文已影响0人  0爱上1

本文基于上一篇Netty Server端实现,以解读源码的方式带领大家理解Server端是如何启动的,主要包括以下几点

  1. EventLoopGroup类的作用是什么?为什么要new两个EventLoopGroup实例

  2. ServerBootstrap类的作用是什么?以及它的build模式中如group()方法,channel()方法,childHandler()方法的分别做了哪些事?

  3. ServerBootstrap的bind方法做了哪些事?

  4. ChannelFuture 接口的作用是什么?

接下来我们围绕这几个点一一去分析,先贴一下整个Server启动的代码,方便我们分析

public class Server {

// 监听端口
private int port;
private Server(int port) {
    this.port = port;
}

// 启动一个Server服务器
private void start() throws InterruptedException {
    // 1.
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        // 2.
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                // 3.
                .channel(NioServerSocketChannel.class)
                // 4.
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new ServerInboundHandler());
                    }
                })
                // 5.
                .option(ChannelOption.SO_BACKLOG, 128)
                // 6.
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        System.out.println("Server is started");
        // 7.
        ChannelFuture f = serverBootstrap.bind(port).sync();

        // 8.
        f.channel().closeFuture().sync();
    }finally {
        // 9.
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }

}

public static void main(String[] args) throws InterruptedException {

    // 利用vm参数传递端口号,不传则默认8081
    int port;
    if (args.length > 0) {
        port = Integer.parseInt(args[0]);
    }else{
        port = 8081;
    }

    // 启动server
    System.out.println("Server to starting... the port is: " + port);
    new Server(port).start();
}
}

启动分析

1. NioEventLoopGroup

NioEventLoopGroup.png

我们跟踪\color{#DC143C}{NioEventLoopGroup}的构造函数,该类提供了多个重载的构造函数,最后的一个构造函数会调用super(...)的构造函数

这里直接看其抽象父类\color{#DC143C}{MultithreadEventLoopGroup}

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里简单介绍一下该类有一个私有的静态常量 DEFAULT_EVENT_LOOP_THREADS,还有一个static静态块用于初始化该常量值,即\color{red}{默认是cpu核心数的两倍}(在不指定系统参数io.netty.eventLoopThreads的情况下)

当构造函数的参数nThreads值为0时就会取该常量值作为该EventLoop事件循环的线程数

继续往下由调用了其抽象父类\color{#DC143C}{MultithreadEventExecutorGroup}的构造方法

/**
 * Create a new instance.
 *
 * @param nThreads          the number of threads that will be used by this instance.
 * @param executor          the Executor to use, or {@code null} if the default should be used.
 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

接下来重点分析该类的this(...)函数

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 重点是这个children属性
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 调用newChild() 方法初始化每一个children数组的元素
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

接下来看newChild(...)方法,该方法为抽象方法,具体的执行为NioEventLoopGroup中类重写的方法

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

\color{#DC143C}{NioEventLoopGroup}

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

可见这里的children数组的元素在实例话的时候实际上是一个个的\color{#DC143C}{NioEventLoop} 对象,重点来了,我们重点分析这个NioEventLoop对象

另外一点就是会为chooser属性赋值一个EventExecutorChooser事件循环选择器,该选择器的有一个next方法,作用是当有channel注册时,具体选择哪个事件循环EventExecutor(NioEventLoop)去注册

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

可以看到这里根据每个事件循环group中事件循环的个数是否是2的次方,分别实例化不同的事件执行选择器,默认两个group的选择器都是PowerOfTwoEventExecutorChooser实例

到这里先总结一下文章开头的第一个问题:

Server端启动代码前new了两个NioEventLoopGroup对象,每个NioEventLoopGroup对象都有一个EventExecutor数组类型的children属性(实际上new的是NioEventLoop对象),而每个children数组的size即是在new NioEventLoopGroup时传入的参数

NioEventLoopGroup.png

2. NioEventLoop

重点介绍这个类, 该类是Netty底层的核心类,继承了抽象类SingleThreadEventLoop(单线程事件循环),注册Channel到Selector,在事件循环中实现IO多路复用

这里说下个人的理解,NioEventLoop一个人负责了Java NIO多路复用中的while(true)循环以及Selector的相关工作,包括register,cancel,select等工作,其实现基础是也是基于JDK的Selector实现的,也所以称之为事件循环

NioEventLoop.png

分析其源码之前我们先带着问题入手

  1. NioEventLoop如何实现循环?

  2. NioEventLoop如何实现Selector选择器相关功能?

\color{red}{内部重点属性}

private Selector selector
private Selector unwrappedSelector
private SelectedSelectionKeySet selectedKeys
private final SelectorProvider provider

看到这里是不是豁然开朗?结合JDK NIO的源码我们发现NioEventLoop内部持有的就是JDK NIO的Selector,也就是利用它们实现了事件register, select

执行事件select时的策略器,即提供了一种能力去控制事件循环的行为,比如一个正在阻塞的select操作能被延迟或跳过,如果有事件需要被立即处理的话

默认情况下属性被赋值为DefaultSelectStrategy实例
private final SelectStrategy selectStrategy

private volatile int ioRatio = 50
private int cancelledKeys
private boolean needsToSelectAgain

ioRatio属性控制IO任务执行的时间占比
cancelledKeys属性表示取消注册的Key集合

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

实例化NioEventLoop完成以下:

  1. 调用父类构造完善父类属性parent等,指向其group

  2. 赋值selectorProvider至provider属性

  3. openSelector()就是利用1.中的provider调用openSelector() 打开一个Selector,并将其赋值给unwrappedSelector属性以及赋值selectedKeys属性等

  4. 赋值selectStrategy属性

至此我们明白了NioEventLoop 通过持有JDK的Selector从而实现select相关功能,那循环又是如何实现的呢?

\color{red}{run()}

NioEvent'Loop重写了SinglethreadEventExecutor中的抽象run方法,该方法即时循环的关键实现

@Override
protected void run() {
    // 死循环实现
    for (;;) {
        try {
            try {
                // 每一次循环都会计算select调用策略,如果taskQueue有任务,即直接执行selectNow(),不阻塞
                // 若taskQueue没有任务,即执行select() 默认阻塞1秒
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            // 不论阻塞调用select还是非阻塞调用,都会执行以下
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                // 当前时间
                final long ioStartTime = System.nanoTime();
                try {
                    处理io事件
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // io事件处理结束,处理taskQueue中的任务,并指定非io任务超时时间,按ioRatio比例计算出来
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

至此我们知道了Nio如何实现事件循环的大体流程,详细的事件循环我会单独放在一篇文章中讲解NioEventLoop事件循环详解


2. ServerBootstrap

  1. UML类图
ServerBootstrap.png
  1. 属性

volatile EventLoopGroup group 即bossGroup

private volatile EventLoopGroup childGroup; 即workerGroup

private volatile ChannelHandler childHandler; 即处理请求channel的处理器

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>()
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();

持有Channel反射工厂
private volatile ChannelFactory<? extends C> channelFactory

  1. group方法

设置用于bossGroup 和workerGroup,bossGroup的事件循环用于处理serverChannel的accept,而workerGroup里的事件循环则用于处理所有channel的IO

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    // 调用父类方法赋值group属性
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    // 赋值childGroup属性
    this.childGroup = childGroup;
    return this;
}
  1. channel方法

channel 就是根据传入的channel 的class类型去创建一个ReflectiveChannelFactory反射Channel工厂,并赋值给channelFactory属性

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();
}    
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

private final Constructor<? extends T> constructor;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
    ObjectUtil.checkNotNull(clazz, "clazz");
    try {
        // 根据clazz类型获取其构造器
        this.constructor = clazz.getConstructor();
    } catch (NoSuchMethodException e) {
        throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                " does not have a public non-arg constructor", e);
    }
}

@Override
public T newChannel() {
    try {
        // 利用反射调用构造器new对应的channel实例
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}
  1. childHandler方法

就是赋值childHandler属性

public ServerBootstrap childHandler(ChannelHandler childHandler) {
    if (childHandler == null) {
        throw new NullPointerException("childHandler");
    }
    this.childHandler = childHandler;
    return this;
}
  1. option方法

就是向属性options中添加元素,该Map内的元素会用在Channel实例被创建时,调用native方法为底层socket设置相关属性若想移除内核默认socket的某个属性值,只要将参数value设置null即可

public <T> B option(ChannelOption<T> option, T value) {
    if (option == null) {
        throw new NullPointerException("option");
    }
    if (value == null) {
        synchronized (options) {
            options.remove(option);
        }
    } else {
        synchronized (options) {
            options.put(option, value);
        }
    }
    return self();
}
  1. childOption方法

同理该方法即是用在channel被创建时指定底层socket属性,若想移除内核默认socket的某个属性值,只要将参数value设置null即可

public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    if (childOption == null) {
        throw new NullPointerException("childOption");
    }
    if (value == null) {
        synchronized (childOptions) {
            childOptions.remove(childOption);
        }
    } else {
        synchronized (childOptions) {
            childOptions.put(childOption, value);
        }
    }
    return this;
}

至此ServerBootstrap基本属性都通过了build的方式赋值完毕,接下来看下关键方法bind方法做了

\color{red}{serverBootstrap.bind(port)}

bind方法定义在了其抽象父类AbstractBootstrap中

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

new了一个InetSocketAddress对象作为参数继续调用了重载的bind方法

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

validate方法做了一些基础校验,包括group属性即bossGroup是否为null以及channelFactory是否为null等

接下来继续调用了doBind方法,重点来了

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

首先我们看下该方法的返回值是一个ChannelFuture对象,至于ChannelFuture是一个通道异步IO操作结果,因为Netty中所有IO操作都是以异步的方式,后面我会专门一篇文章来分析ChannelFuture
Netty源码之ChannelFuture

doBind方法首先调用了initAndRegister方法,从方法名上我们知道这是一个初始化和注册的方法,初始化谁?注册谁?

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 1. 利用通道工厂创建一个新的channel,这里内部就是利用NioServerSocketChannel构造反射一个实例,并利用默认的SelectorProvider open了一个NioServerSocketChannel实例
        channel = channelFactory.newChannel();
        // 2. 紧接着调用init方法,该方法为抽象方法,实际的init实现在ServerBootstrap中,该方法主要处理该channel的ChannelOption属性,attr属性以及pipeline以及相应的handler等信息
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // 3. 注册该channel到serverBootstrap的bossGroup事件循环组上,具体的注册需要由事件循环组`选择`一个事件循环(`NioEventLoop`)来注册
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}
  1. 继续向下看ReflectiveChannelFactory的newChannel方法做了什么
public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

这里由于之前赋值了属性constructor为NioServerSocketChannel的构造器,故反射调用实例化

NioServerSocketChannel

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}


private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

利用默认的SelectorProvider 去openServerSocketChannel,在继续调用this()方法

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

这里可以看到ServerSocketChannel 实例构建以后再去调用父类方法

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

即赋值id属性,以及new一个该Channel所属的pipeline

至此ServerSocketChannel实例化完成

  1. init(channel)方法做了什么?

init方法是AbstractBootstrap定义的抽象方法,具体的实现是由ServerBootstrap实现的,我们直接子类的实现

void init(Channel channel) throws Exception {
    // 1. 获取options集合中,将其设置到底层ServerSocket上
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 2. 获取attrs集合,设置到channel上
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    // 3. 获取channel所属的pipeline
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            // 4. 重点是这里为ServerSocketChannel添加了一个入站处理器ServerBootstrapAcceptor,该处理器会当ServerSocketChannel有Accept事件时负责将
            socketChannel注册到currentChildGroup,并设置currentChildHandler等工作,即ServerBootstrapAcceptor是一个桥梁,联通了bossGroup和workerGroup
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
  1. config().group().register(channel)注册

当ServerSocketChannel实例化后,并init完成,但是此时该channle还没有注册到eventLoop上,接下来就会完成注册动作

config().group()方法就是获取当前bossGroup事件循环组实例,即NioEventLoopGroup,我们直接看它的register方法做了什么

NioEventLoopGroup的register方法并没有覆写其抽象父类MultithreadEventLoopGroup的方法

MultithreadEventLoopGroup

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

这里调用了内部的next方法,返回一个EventLoop实例,这里大家思考以下next方法的作用是什么?

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}

这里调用了其父类MultithreadEventExecutorGroup的next方法,继续

@Override
public EventExecutor next() {
    return chooser.next();
}

到了这里大家是否明白next方法的作用了呢?文章开头部分讲述了在new NioEventLoopGroup对象的时候有一个属性是chooser的赋值,该属性是一个eventLoop选择器,因为我们的NioEventLoopGroup对象内有一个eventLoop数组,当我们在注册某个channel的时候到底是注册到哪个eventLoop上呢?这个工作由这个选择器来完成

当时实例化的是PowerOfTwoEventExecutorChooser选择器,选择的规则就是用一个自增的AtomicInteger类型的idx值去取模eventLoop数组的length - 1,就得到了channel需要注册到的eventLoop数组的下标从而取出对应的eventLoop去注册

AbstractChannel 类的注册方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }
        // 赋值该channel所要注册到的事件循环是哪个
        AbstractChannel.this.eventLoop = eventLoop;
        // 判断当前执行线程是否是该eventLoop内部的单线程,当前线程是Main,此时的eventLoop 内的单线程为null,还没有启动过
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                // 这里调用eventLoop的execute提交任务执行register0注册
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

继续向下看NioEventLoop的execute方法,该方法定义在其抽象父类
SingleThreadEventExecutor内,NioEventLoop并未重写该方法

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    
    // 判断当前执行线程是否是该EventLoop自身线程,这里是Main方法调用的,且该EventLoop自身线程还未启动(`thread属性仍为null`),返回false
    boolean inEventLoop = inEventLoop();
    // 将需要执行的任务丢进taskQueue任务队列中
    addTask(task);
    if (!inEventLoop) {
      // 这里会新建该EventLoop的唯一单线程,并调用其run方法启动事件循环
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

startThread方法 启动EventLoop的执行线程

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

可以看到这里利用了CAS实现保证了只启动一个线程,再看doStartThread方法内部

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 调用自身的run方法,该方法由NioEventLoop覆写了
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    if (logger.isErrorEnabled()) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                "be called before run() implementation terminates.");
                    }
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                        // the future. The user may block on the future and once it unblocks the JVM may terminate
                        // and start unloading classes.
                        // See https://github.com/netty/netty/issues/6596.
                        FastThreadLocal.removeAll();

                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            if (logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }
                        }
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}

这里的executor是当时创建NioEvnetLoopGroup时new的ThreadPerTaskExecutor任务执行器,包含了一个默认的线程工厂(含有“nioEventLoopGroup-2-”前缀属性,即一个事件循环组对应一个默认线程工厂)

ThreadPerTaskExecutor类

public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}

}

这里就是调用线程工厂去new一个线程,并把任务传进去执行该任务,注意这里的command已经交给了新建的线程执行了

因此thread = Thread.currentThread() 被赋予了当前执行的新线程即以“nioEventLoopGroup-2-”前缀命名的EventLoop线程

这里的SingleThreadEventExecutor.this.run(),EventLoop线程正式启动了NioEventLoop的事件循环方法,而在其run方法中会利用for死循环不断执行该EventLoop的IO任务以及非IO任务(此处的注册任务就属于非IO任务)

initAndRegister方法执行完成了,断点一下,我们看看此时ServerSocketChannel的属性状态

initAndRegister.png ServerSocketChannel.png

可以看到registered状态为true表示已注册,我们通过cmd查看一下本机端口占用情况

image.png

此时的端口8081还未被监听,也就是说ServerSocketChannel还没有bind到8081端口并启动监听,继续往下看代码

\color{red}{doBind0(regFuture, channel, localAddress, promise)}

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

可以看到这里就是通过获取ServerSocketChannel注册到的那个EventLoop实例并提交一个任务,任务就是将ServerSocketChannel 绑定到指定的端口上

bind实际上是调用了NioServerSocketChanneldoBind方法

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

这里的javaChannel()的bind方法实际上是调用了ServerSocketChannelImpl实例自身的bind方法,注意这里用到了config里的backlog属性值,如果在ServerBootstrap的option方法不指定的话,默认windows下该值默认为200,其他情况下为128

The SOMAXCONN value of the current machine. If failed to get the value, {@code 200} is used as a default value for Windows or {@code 128} for others.

这里有一篇fasionchan博主发布的关于内核backlog参数的叙述
深入理解Linux TCP backlog

public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
    synchronized(this.lock) {
        if (!this.isOpen()) {
            throw new ClosedChannelException();
        } else if (this.isBound()) {
            throw new AlreadyBoundException();
        } else {
            InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);
            SecurityManager var5 = System.getSecurityManager();
            if (var5 != null) {
                var5.checkListen(var4.getPort());
            }

            NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
            Net.bind(this.fd, var4.getAddress(), var4.getPort());
            Net.listen(this.fd, var2 < 1 ? 50 : var2);
            synchronized(this.stateLock) {
                this.localAddress = Net.localAddress(this.fd);
            }

            return this;
        }
    }
}

最终这里调用了bind方法和listen方法,即完成了端口的绑定和监听工作,等待Client端的connect请求

至此整个Netty Server端启动完成,下面用一张流程图表示整个Netty Server启动流程方便记忆

Netty Server.png

总结

这里回复一下文章开头的几个问题

  1. EventLoopGroup的作用说白了就是个Selector事件循环组,当有channel需要注册时,他会提供选择某个NioEventLoop去注册的功能
    至于有两个group的原因:bossGroup内提供的NioEventLoop数组是用来handle客户端的Accept连接请求的,而workGroup内的NioEventLoop则是处理客户端连接之后的事件循环(IO任务和非IO任务),一般bossGroup的NioEventLoop数组数量设为1, 而workGroup的数量默认为当前CPU数量的2被

  2. ServerBootstrap类的作用是一个简化Netty Server程序启动的启动类,其中包含了很多贯穿整个Netty程序需要用到的属性,比如channel方法指定了用于监听client socket连接的ServerSocket 的class类型,childHandler方法则用于当有client连接事件准备好后,并创建了对应的SocketChannel后,该SocketChannel对应的pipeline中需要添加进去哪些handler

  3. ServerBootstrap的bind方法是Netty Server启动的最后关键一步,前期的相关方法都可以认为是相关静态属性赋值(底层Selector初始化除外),通过bind方法,Netty实例化了NioServerSocketChannle实例,并为其init了该channel相应的option属性以及pipeline内的相关handler,这里add了一个重要的handler就是ServerBootstrap的内部类ServerBootstrapAcceptor,该类继承了ChannelInboundHandlerAdapter 入站处理器,client连接以后,ServerSocket 的pipeline最有一个处理器就是该处理器,会在该处理器中进行socketChannel的相关init工作(option属性配置,pipeline添加childHandler以及调用workGroup发起注册OP_READ动作等),最有bind方法会进行该NioServerSocketChannle的内核底层绑定端口,并监听端口,等待client的请求连接

  4. 最后说下ChannelFuture的作用,Netty中所有的异步channel的IO操作都是异步的方式,意味着所有的IO操作都将会立即返回,即无法保证所请求的I/O操作在调用结束时已完成,代替的是将会得到一个ChannelFuture的实例,通过这个实例我们将可以得到IO操作的结果和状态

而获取结果的方式是给这个ChannelFuture的实例增加一个Listener,当该ChannelFuture实例isDone为true的时候,会通知该监听器的operationComplete方法,而我们会在该方法内部再编写逻辑,根据IO操作的结果是成功还是失败而做出不同的处理

以上就是Netty Server端启动的所有分析,因水平有限,如有错误的地方还望不吝指出,共同进步...

上一篇下一篇

猜你喜欢

热点阅读