netty首页投稿(暂停使用,暂停投稿)程序员

Netty 源码解析 ——— Netty 优雅关闭流程

2017-11-27  本文已影响1260人  tomas家的小拨浪鼓

本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。

Netty的优雅关闭操作

Netty是通过『eventLoopGroup.shutdownGracefully()』操作来实现它的优雅关闭的。

我们先来看下shutdownGracefully方法的doc说明:

    /**
     * Signals this executor that the caller wants the executor to be shut down.  Once this method is called,
     * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
     * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
     * (usually a couple seconds) before it shuts itself down.  If a task is submitted during the quiet period,
     * it is guaranteed to be accepted and the quiet period will start over.
     *
     * @param quietPeriod the quiet period as described in the documentation
     * @param timeout     the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
     *                    regardless if a task was submitted during the quiet period
     * @param unit        the unit of {@code quietPeriod} and {@code timeout}
     *
     * @return the {@link #terminationFuture()}
     */
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

调用者希望执行器进行关闭的信号。一旦这个方法被调用了,『isShuttingDown()』方法将开始都会返回true,同时执行器准备关闭它自己。不像『shutdown()』方法,优雅关闭会确保在它关闭它自己之前没有任务在’the quiet period’(平静期,即,gracefulShutdownQuietPeriod属性)内提交。如果一个任务在平静期内提交了,它会保证任务被接受并且重新开始平静期。
如果你现在,对这段描述有些许困惑,没关系,请继续往下看,gracefulShutdownQuietPeriod(即,quietPeriod参数)、gracefulShutdownStartTime(即,timeout参数)主要会在『confirmShutdown()』方法中使用,下面会结合方法的实现场景来说明gracefulShutdownStartTime、gracefulShutdownQuietPeriod的含义。

源码解析

    // AbstractEventExecutorGroup#shutdownGracefully
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

    // MultithreadEventExecutorGroup#shutdownGracefully
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l: children) {
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        return terminationFuture();
    }

遍历EventExecutor[]数组,取出EventExecutor执行shutdownGracefully操作。因为优雅关闭的流程主要是在各个NioEventLoop线程各自完成的,它是一个异步操作,因此此时返回该异步操作的Future,它是一个无返回结果的DefaultPromise对象。



① 确保 quietPeriod、unit的为有效值,即『quietPeriod >= 0』、『unit != null』。同时,确保timeout、quietPeriod之间的正确性,即『quietPeriod <= timeout』。
② 如果该NioEventLoop已经执行过关闭操作了,可能是『shutdownGracefully()』这样的优雅关闭,也有可能是『shutdown() or shutdownNow()』,当然后两种方法已经不建议使用了(Deprecated)。那么直接返回该异步操作的Future对象。
③ 使用自旋锁(『自旋 + CAS』)的方式修改当前NioEventLoop所关联的线程的状态(volatile修饰的成员变量state)。因为此方法可能被多线程同时调用,所以使用了自旋锁的方式来保证NioEventLoop所关联的线程状态(state成员变量)的修改是原子性的。
之前我们说过,NioEventLoop所关联的线程总共有5个状态,分别是:

private static final int ST_NOT_STARTED = 1;    // 线程还未启动
private static final int ST_STARTED = 2;        // 线程已经启动
private static final int ST_SHUTTING_DOWN = 3;  // 线程正在关闭
private static final int ST_SHUTDOWN = 4;       // 线程已经关闭
private static final int ST_TERMINATED = 5;     // 线程已经终止

其中,在正常的线程状态流为:ST_NOT_STARTED ——> ST_STARTED ——> ST_SHUTTING_DOWN ——> ST_TERMINATED。
而ST_SHUTDOWN这个线程状态是已经弃用的『shutdown() or shutdownNow()』所会设置的线程状态,但是无论怎样在此步骤中,线程的状态至少为会置为ST_SHUTTING_DOWN,或者说正常情况下都是会设置为ST_SHUTTING_DOWN的。
补充简单说明下两个知识点:
a) 自旋锁(Spin lock):由它自己去占有CPU运行的时间,然后去尝试进行更新,直到更新成功完成。也因为它是占用CPU资源的方式,所以自旋锁实现的操作是非常简短的,不然其他线程可能会一直在自旋等待该自旋锁。也正式因为自旋锁是不会释放CPU的,也就是线程无需被挂起,这样就没有线程上下文切换的问题了。
因此,自旋锁一般用于在多核处理器中预计线程持有锁的时间很短(即锁操作所需的时间非常的短)情况,甚至时间短于两次线程上下文的切换的开销。
b) volatile的可见性:volatile除了保证单个变量的读/写具有原子性外,还有有一个很重要的特性就是对线程内存可见性的保证(即,对一个 volatile 变量的读,总是能看到(任意线程)对这个 volatile 变量最后的写入)。因为此处修改state字段(本文是Netty服务端主线程)的线程和使用该字段的线程(NioEventLoop所关联线程)不是同一个线程。因此通过volatile来修饰state字段来实现,通过主线程修改了EventLoop所关联的线程状态后,在NioEventLoop的事件循环中能立即正确感知其线程状态的变化,从而做出相应的操作。
④ 根据传入的参数,设置成员变量gracefulShutdownQuietPeriod、gracefulShutdownTimeout。这里分别为默认值,gracefulShutdownQuietPeriod为2秒,gracefulShutdownTimeout为15秒。
⑤ 如果NioEventLoop所关联的线程之前的状态为ST_NOT_STARTED,则说明该线程还未被启动过,那么启动该线程。
Q:为什么我们在执行关闭操作的时候,还需要特意去启动那些未启动的NioEventLoop线程了?
A:是这样的,在基于NIO的网络传输模式中,会在构建NioEventLoopGroup的时候就预先将一定数量的NioEventLoop给创建好(默认为操作系统可运行处理器数的2倍),而NioEventLoop在初始化的时候就会将其上的Selector给开启了。同时Selector的关闭是在『doStartThread()』方法中最后会去完成的事。关于『doStartThread()』方法将在后面详细展开。

好了,在完成将NioEventLoop所关联的线程状态修改为’ST_SHUTTING_DOWN’,也就说明关闭流程的开始。那么,接下来我们来看看NioEventLoop中是如果完成优雅的关闭的。

我们先来看看doStartThread()方法:

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

① 这里executor.execute方法底层会通过ThreadPerTaskExecutor.execute(Runnable)方法来创建并启动执行任务的唯一线程。然后启动的线程就会执行我们通过executor.execute方法提交上来的这个任务(具体的这块说明请见Netty 源码解析 ——— 服务端启动流程 (上))。
② 在Runnable任务中,会将当前的线程设置为NioEventLoop所关联的线程,即对成员变量thread赋值为Thread.currentThread()。然后执行『SingleThreadEventExecutor.this.run();』这里实际调用的是『NioEventLoop#run()』方法来进行事件循环操作。
③ 当事件循环操作退出后(当NioEventLoop需要关闭时,事件循环才会退出),进行关闭的后续操作。

当NioEventLoop已经处于使用状态(即,上面有Channel与其绑定),那么此时它会处于事件循环操作中;若NioEventLoop没有处于使用状态(即,该NioEventLoop已经被初始化构建好了,但还没有任何一个Channel与其绑定过),那么在执行shutdownGracefully()后,也会因为调用了doStartThread()方法,此时该NioEventLoop也会处于事件循环中。
那么,接下来我们就来看看NioEventLoop中事件循环对于优雅关闭都完成了哪些操作了?

『NioEventLoop#run()』:

    protected void run() {
        for (;;) {
            try {
                ......
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

此处,我们仅对与优雅关闭流程相关的部分进行展开。
事件循环首先会对Selector上注册的Channel所就绪的I/O事件做处理,然后处理taskQueue中的任务以及时间已经到达的定时/周期性任务。最后,在每次事件循环的最后都会判断一次当前的线程状态,如果发现当前的线程状态处于正在关闭的状态(即,state >= ST_SHUTTING_DOWN)则会开始处理关闭流程,即:

    // Always handle shutdown even if the loop processing threw an exception.
    try {
        if (isShuttingDown()) {
            closeAll();
            if (confirmShutdown()) {
                return;
            }
        }
    } catch (Throwable t) {
        handleLoopException(t);
    }

注意,事件循环中将正常的工作流程放在了一个try-catch中,将关闭流程放在了另一个try-catch中,这是为了它们之间能够不会互相影响。这样即便工作流程抛出异常了,每次事件循环的最后依旧能够去处理关闭事件。

关闭流程主要分为两步:
① 『closeAll()』:

    private void closeAll() {
        selectAgain();
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }

        for (AbstractNioChannel ch: channels) {
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }

获取该注册到这个Selector所有Channel所对应的SelectionKey,然后获取SelectionKey附加对象attachment(),若attachment是一个AbstractNioChannel对象则先让放入到channels集合中,否则直接调用『k.cancel(),即selectionKey.cancel()』操作将这个SelectableChannel从Selector上注销。最后遍历channels集合,依次取出AbstractNioChannel,进行AbstractNioChannel的关闭操作(『ch.unsafe().close(ch.unsafe().voidPromise());』)


  1. 如设置了Socket#SO_LINGER配置项(即,config().getSoLinger() > 0),则说明当需要关闭socket时,如果这时send buffer里还有数据没有发送完,则先尝试把send buffer中的数据发送完了再关闭socket。所以此时会先执行doDeregister()操作,将当前的SocketChannel从Selector上注销,然后将close()操作作为一个任务放到另一个执行器去执行,也就是说不在当前的NioEventLoop的线程上去执行当前SocketChannel的关闭操作,因为此时SocketChannel不会马上关闭,它需要尝试在l_linger time时间内将发送缓存区中的数据发送出去并等待对方的确认。在l_linger time时间之后socket才会真正的被关闭。
  2. 如果没有设置Socket#SO_LINGER配置项,则直接在NioEventLoop线程上进行SocketChannel/ServerSocektChannel的close()操作。并将outboundBuffer中所有还未发送出去的消息标志为操作失败(fail flush),然后关闭outboundBuffer,释放相关资源。在关闭socket之后,将SocketChannel/ServerSocketChannel从Selector上注销(即,『selectionKey.cancel()』。selectionKey表示一个SocketChannel/ServerSocketChannel注册到Selector的关联关系)。
  3. 触发‘channelInactive’事件和‘channelUnregistered’事件,这两个事件都会在ChannelPipeline中得以传播。但这两个事件的触发会被封装为一个任务提交至当前的NioEventLoop的taskQueue在随后被执行,这么做的原因是为了确保‘channelInactive’事件和‘channelUnregistered’事件的触发会在NioEventLoop线程上执行。‘channelInactive’事件和‘channelUnregistered’事件都是入站事件,它们会依次顺序调用ChannelPipeline中的ChannelInboundHandler的channelInactive()方法以及channelUnregistered()方法。并且,ChannelPipeline中的head在处理‘channelUnregistered’事件时除了将该事件传播给ChannelPipeline中的下一个ChannelInboundHandler外,还会触发一个destroy()操作
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelUnregistered();

            // Remove all handlers sequentially if channel is closed and unregistered.
            if (!channel.isOpen()) {
                destroy();
            }
        }

该destroy()操作会删除ChannelPipeline中的所有的handler(除了head、tail之外),并触发每个Handler的handlerRemoved()方法。注意,这里handler的移除操作是先顺序移除head到tail间所有的ChannelInboundHandler,然后在顺序移除tail到head间所有的ChannelOutboundHandler。




② 『confirmShutdown()』:
protected boolean confirmShutdown() {
    if (!isShuttingDown()) {
        return false;
    }

    if (!inEventLoop()) {
        throw new IllegalStateException("must be invoked from an event loop");
    }

    cancelScheduledTasks();

    if (gracefulShutdownStartTime == 0) {
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    }

    if (runAllTasks() || runShutdownHooks()) {
        if (isShutdown()) {
            // Executor shut down - no new tasks anymore.
            return true;
        }

        // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
        // terminate if the quiet period is 0.
        // See https://github.com/netty/netty/issues/4241
        if (gracefulShutdownQuietPeriod == 0) {
            return true;
        }
        wakeup(true);
        return false;
    }

    final long nanoTime = ScheduledFutureTask.nanoTime();

    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
        return true;
    }

    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        // Check if any tasks were added to the queue every 100ms.
        // TODO: Change the behavior of takeTask() so that it returns on timeout.
        wakeup(true);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // Ignore
        }

        return false;
    }

    // No tasks were added for last quiet period - hopefully safe to shut down.
    // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
    return true;
}

首先,先简单的描述下『runAllTasks()』和『runShutdownHooks()』所会完成的操作:
a) runAllTasks():首先会将已经到运行时间的定时/周期性任务放入taskQueue中,然后依次执行taskQueue中的任务。当且仅当taskQueue中的任务都执行完了,该方法会返回true,并且会将最后一个任务执行完后此时的系统时间赋值为成员变量lastExecutionTime;否则,如果该taskQueue中没有要执行的任务,那么该方法会返回false。
b) runShutdownHooks():执行用户自定义的所有shutdownHook,比如我们通过(『nioEventloop.addShutdownHook(runnable)』方法来提交我们希望该NioEventLoop被关闭时所要执行的一些操作)。当shutdownHook都执行完了该方法会返回true,并且会在执行完最后一个showdownHook后将此时的系统时间赋值为成员变量lastExecutionTime;否则,如果没有任何需要执行的shutdownHook,即shutdownHooks集合为空,那么该方法将返回false。

接下来,我们来判断在什么条件下confirmShutdown()方法将返回true,以至于可以退出NioEventLoop的事件循环,继续doStartThread()的后续操作以完成最后的优雅关闭流程。
我们分两种情况来讨论:
① gracefulShutdownQuietPeriod == 0
如果taskQueue中待执行的任务,或者有到期的定时/周期性任务,再或者有用户自定义的shutdownHook任务,那么会在执行完任务后退出confirmShutdown方法,并返回true;否则,如果没有任务待执行的任务,那么‘nanoTime - lastExecutionTime > gracefulShutdownQuietPeriod’也会使得confirmShutdown()方法退出,并返回true。

② gracefulShutdownQuietPeriod > 0

  1. 从『if (runAllTasks() || runShutdownHooks())』这个判断语句中,我们能够确保只有在taskQueue中所有的任务都被执行完了,并且shutdownHooks集合中所有的shutdownHook也都执行完了之后,这个判断语句才会返回false。也就是说,当该if语句返回false时,我们能够确保所有的任务和shutdownHook都已经执行完了。
  2. 『nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout』:接下来我们判断,执行完上面所有任务(包括taskQueue中的任务、可执行的定时/周期性任务、所有的shutdownHook任务)所需的时间是否已经操作了优雅关闭的超时时间(gracefulShutdownTimeout),如果已经超过了,那么则退出confirmShutdown方法,并返回true。否则,继续下面的步骤
  3. 『nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod』:如果‘当前时间距离最后一次执行任务的时间’小于等于’优雅退出的平静期(gracefulShutdownQuietPeriod)’。则使NioEventLoop线程睡眠100ms后,退出confirmShutdown方法,并返回false,这时说明关闭操作是未被批准的,那么NioEventLoop的事件循环并不会退出,并且会在下次事件循的最后再次调用confirmShutdown()方法进行关闭操作的确认,也就是会从新执行步骤1;否则,如果‘当前时间距离最后一次执行任务的时间’大于’优雅退出的平静期(gracefulShutdownQuietPeriod)’,则退出confirmShutdown方法,并返回true。此时说明,在一个优雅退出的平静期(gracefulShutdownQuietPeriod)内都没有任何的任务被提交至该NioEventLoop线程上,那么我们就有希望能够安全的进行关闭。为什么说是有希望了?这是因为我们实在没有办法保证在此时用户不会通过execute()来提交一个任务。


    我们用一个流程图来说明gracefulShutdownQuietPeriod、gracefulShutdownTimeout在confirmShutdown操作中起到的作用和关系(注意,下面并不是confirmShutdown()方法流程图):


    好了,在结束NioEventLoop的事件循环后,我们继续来看doStartThread()的后续操作。
    首先会将变量success设置为true,接下就是执行finally块中的代码了:
    ① 如果当前NioEventLoop线程的状态还不是处于关闭相关的状态的话,则通过自旋锁的方式将当前NioEventLoop线程的状态修改为’ST_SHUTTING_DOWN’。从我们当前优雅关闭的流程来说,当前NioEventLoop线程的此时就是ST_SHUTTING_DOWN了。
    ② 判断,如果NioEventLoop事件循环结束了,但是‘gracefulShutdownStartTime’成员变量却为0,则说明事件循环不是因为confirmShutdown()方法而导致的结束,那么就打印一个错误日志,告知当前的EventExecutor的实现是由问题的,因为事件循环的终止必须是通过调用confirmShutdown()方法来实现的,也就是说,事件循环能够正确退出,也就是因为关闭操作被确认了。
    ③ 此时会通过自旋锁的方式再次调用一次『confirmShutdown()』,以确保所有的NioEventLoop中taskQueue中所有的任务以及用户自定义的所有shutdownHook也都执行了。之后才会进行关闭操作。
    ④ cleanup():
    protected void cleanup() {
        try {
            selector.close();
        } catch (IOException e) {
            logger.warn("Failed to close a selector.", e);
        }
    }

会将当前NioEventLoop所关联的Selector关闭。
⑤ 修改NioEventLoop线程的状态为’ST_TERMINATED’。注意,在此操作完成之后,所有提交至该NioEventLoop显示的任务都会被拒绝,也就是该NioEventLoop不会再接收任何的任务了。

protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}

final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}

public boolean isShutdown() {
    return state >= ST_SHUTDOWN;
}

⑥ threadLock.release():threadLock是一个初始化资源为0的信号量,此操作会使得信号量的资源+1。那么这种情况下,如果有用户操作了awaitTermination方法的话(该方法底层会通过『threadLock.tryAcquire(timeout, unit)』来阻塞的尝试获取信号量的资源),该方法就会结束阻塞并返回,当然它也可以因为设置的等待超时间已到而返回。
⑦ 此时会再次判断该NioEventLoop的taskQueue是否为空,如果为非空,只会打印警告日志,告知用户,当前NioEventLoop在退出时仍有未完成的任务。而这个任务可能是在步骤③完成后,步骤⑤完成之前,又有用户提交上来的。
⑧ 设置该优雅关闭异步操作为成功完成。

后记

好了,整个NioEventLoopGroup的整个优雅关闭流程就分析完了,一句简单『nioEventLoopGroup.shutdownGracefully()』操作背后竟然有着如此复杂的关闭流程,再次佩服Netty为我们将复杂的流程给封闭化,而提供最为简便的API供用户来更好更方便的去使用它。
若文章有任何错误,望大家不吝指教:)

上一篇下一篇

猜你喜欢

热点阅读