消息队列MQ(Kafka&RabbitMQ)Magic Netty 我爱编程

Netty分享之EventLoop

2018-02-09  本文已影响210人  逅弈

gris 转载请注明原创出处,谢谢!

Netty高性能的原因除了他设计堪称完美的IO模型外,另外一个原因就是他的线程模型。

有关netty线程模型的内容不是本篇文章将要分享的,我只简单的描述下netty线程模型的几个类别,具体的内容可以查询相关的资料,大体上netty的线程模型可以分成以下几种:

在网络连接从开始到结束的这段生命周期里,我们需要处理连接中的事件,而这就需要为这些事件创建并执行相应的任务。而在netty中一个Channel通道从建立起来时就会为他分配一个EventLoop,用来处理该通道上需要执行的事件,并且该EventLoop直到连接断开的整个过程中都不会发生变化。
一个EventLoop的内部是一个Thread在支撑,Netty线程模型的卓越性能之一取决于对当前执行的Thread的身份的确定,通过调用EventLoop的inEventLoop(Thread thread)方法来确定。就是在执行一个任务时,确定当前线程是否是分配给当前Channel以及Channel的EventLoop的那一个线程。如果当前线程正好就是支撑EventLoop的那个线程,那么提交给EventLoop的任务将会被直接执行,否则EventLoop会把该任务放入一个内部的队列中进行调度,以便稍后在下一次处理事件时执行。用一个简单的图表示如下:

Executor_logic.png

鉴于Netty线程模型的基石是建立在EventLoop上的,我们今天就来详细的了解下EventLoop。
首先从设计上来看,EventLoop采用了一种协同设计,它建立在两个基本的API之上:Concurrent和Channel,也就是并发和网络。并发是因为它采用了线程池来管理大量的任务,并且这些任务可以并发的执行。其继承了EventExecutor接口,而EventExecutor就是一个事件的执行器。另外为了与Channel的事件进行交互,EventLoop继承了EventLoopGroup接口。一个详细的EventLoop类继承层次结构如下:

EventLoop_Class_Structure.png

一个Netty服务端启动时,通常会有两个NioEventLoopGroup:一个boss,一个worker。第一个NioEventLoopGroup正常只需要一个EventLoop,主要负责客户端的连接请求,然后打开一个Channel,交给第二个NioEventLoopGroup中的一个EventLoop来处理这个Channel上的所有读写事件。一个Channel只会被一个EventLoop处理,而一个EventLoop可能会被分配给多个Channel。

我们知道每个EventLoop只要一个Thread来支撑并处理事件,可以在SingleThreadEventExecutor类中找到这个thread:

/**
 * 用来执行任务的线程
 * 该线程其实就是用来支撑SingleThreadEventLoop的
 */
private volatile Thread thread;

通过debug,我们可以发现服务器启动过程中bootstrap.bind(PORT).sync()最终会执行到AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise)的方法,而doBind0方法如下:

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}  

最终会执行到EventLoop的execute方法,而execute方法是在Executor接口中定义的,最终会由SingleThreadEventExecutor来执行该execute方法,一个SingleThreadEventExecutor的调用链大致如下:

#1 SingleThreadEventExecutor.execute(Runnable task)
    -> #2 SingleThreadEventExecutor.startThread()
    -> #2 SingleThreadEventExecutor.doStartThread()
    -> #2 SingleThreadEventExecutor.this.run()
        -> #3 NioEventLoop.run() // for(;;)-loop
            -> #4 NioEventLoop.select(boolean oldWakenUp)
            -> #4 SingleThreadEventExecutor.runAllTasks(long timeoutNanos)

从代码中可以知道下面一些结论:
1、SingleThreadEventExecutor中的Thread的初始化在doStartThread这个方法中
2、最后会调用到SingleThreadEventExecutor.this.run()方法,而该run方法是在NioEventLoop中实现的
3、NioEventLoop中的run方法通过一个for(;;)循环来处理Channel事件的类

让我们来看一下NioEventLoop中的run方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

在这个for(;;)循环里面,首先会通过selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())判断当前有没有可以执行的任务,没有则继续循环,如果有任务,那就开始处理任务。在处理事件的时候,不仅要处理IO事件,还要处理非I/O事件。其中ioRatio变量表示的是:处理I/O事件和非I/O事件的时间占比,默认为50%。
当且仅当isShuttingDown()为true的时候,才开始停止循环。

我们可以通过在关键代码里打印一些日志,来查看服务端启动过程中都执行了哪些事情,例如,可以在每个方法的第一句增加一个System.out.println(),具体要在哪些方法中加,各位可以自己去尝试,下面我把我增加后服务端启动时打印的日志信息列在下面:

gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.channel.AbstractChannel$AbstractUnsafe$1@795cd85e
gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-startThread
gris-debug-SingleThreadEventExecutor-doStartThread
gris-debug-NioEventLoop-run
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.channel.AbstractChannel$AbstractUnsafe$1@795cd85e
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.bootstrap.ServerBootstrap$1$1@5a627d0e
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.bootstrap.ServerBootstrap$1$1@5a627d0e
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.bootstrap.AbstractBootstrap$2@41dad0a4
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.bootstrap.AbstractBootstrap$2@41dad0a4
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-execute,task=io.netty.channel.AbstractChannel$AbstractUnsafe$2@5da6b8da
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.channel.AbstractChannel$AbstractUnsafe$2@5da6b8da
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1
gris-debug-SingleThreadEventExecutor-task poll from taskQueue is null,will break
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom
gris-debug-NioEventLoop-select
gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos
gris-debug-SingleThreadEventExecutor-runAllTasksFrom

可以看到每一步具体执行所在的类和方法,可以发现最终程序停在NioEventLoop.run()方法中,循环着,等待事件的到来,然后进行处理。

我是逅弈,如果文章对您有帮助,欢迎您点赞加关注,并欢迎您关注我的公众号:

欢迎关注微信公众号
上一篇下一篇

猜你喜欢

热点阅读