Netty快速入门(11)EventLoop源码解析
EventLoop的类层图
我们来简单讨论一下Netty线程模型的源码。学习一下EventLoopGroup的原理。首先看一下EventLoop的类层结构图:
上面是一个简化的版本,完整的是非常复杂的。从整体上来看,netty针对EventLoop定义了两种类型的接口,一个是EventExecutor和EventExecutorGroup,另一个是EventLoop和EventLoopGroup。
通过这些接口,可以总结出两大功能,第一个就是它是一个事件执行器(EventExecutor),它能够执行各种事件,也能够向它提交事件,它能够执行。第二它是一个EventLoop,能够向它注册handler,进行循环处理。相当于nio中的selector加上循环操作。
对于每个NioEventLoop,它继承了SingleThreadEventLoop,同时SingleThreadEventLoop继承了SingleThreadEventExecutor。
对于每个NioEventLoopGroup,它继承了MultithreadEventLoopGroup,同时MultithreadEventLoopGroup继承了MultithreadEventExecutorGroup。
EventExecutor结构
EventExecutor是一个事件执行器,提供了很多执行任务的方法:
比如submit方法,这些接口都继承了ScheduleExecutorService接口,因此它们能够执行定时任务,能够提交任务,还包括schedule方法,其中最重要的方法是next,上面是EventExecutorLoop里面的一些方法。
下面的EventExecutor里面的方法也是一样,next返回自己的类型,表示下一个执行,parent()方法表示它是属于哪一个EventExecutorGroup,因为每个EventExecutor都属于一个EventExecutorGroup,parent()方法能判断它属于哪个组。inEventLoop判断当前方法是不是在当前线程里面执行,还有其他方法等等。
EventExecutor和EventExecutorGroup可以总结如下:
1.EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor;
2.EventExecutorGroup是不干什么事情的,当收到一个请后,他就调用next()获得一个它里面的EventExecutor,再调用这个executor的方法;
3.EventExecutorChooser.next()定义选择下一个EventExecutor的策略;
我们来看一下选择下一个EventExecutor的策略的具体实现,
查看DefaultEventExecutorChooserFactory的源码,
算法很简单,它有两中选择方式,一种是PowerOfTwoEventExecutorChooser,另一种是GenericEventExecutorChooser。它们的算法很简单,其实就是轮询(round robin),如果是2的指数,就用位来做(PowerOfTwoEventExecutorChooser),如果不是2的指数就取模(GenericEventExecutorChooser),这两种效果是一模一样的。
再来看一下具体的内容,来看MultithreadEventExecutorGroup类方法:
其中有个children属性,就是一个EventExecutor[]数组,有个chooser属性,就是EventExecutorChooser选择器,所有的方法都是通过next派发给SingleThreadEventExecutor去处理:
就是在这里具体处理事情。它里面实现的一个重要的execute(Runnable )方法,就是做线程提交的,一定要关注这个方法。所有的方法都派发给SingleThreadEventExecutor后,它自己也不干,它交给它里面的Executor属性去做,具体的实现一般是ThreadPerTaskExecutor:
上面可以理解为都是中介,真正干活的就是ThreadPerTaskExecutor。
EventLoop结构
前面的EventExecutor就是执行,EventLoop主要是注册:
EventLoopGroup也是啥都不干,让下面的SingleThreadEventLoop来干。
EventLoop除了注册事件,还包括事件循环,因为netty有很多EventLoop和EventLoopGroup,比如nio对应的是nio的一套,bio对应的是bio的一套,其它非nio的也各自有一套,我们这里只讨论nio这一套。它的事件循环肯定是放到具体的run方法里面的:
NioEventLoop中的run方法实现的是SingleThreadEventExecutor中的run方法,每个EventLoopGroup都包括一系列的EventExecutor,看下图:
上图左侧表示一个EventLoopGroup,同样包含了一系列的EventExecutor,刚才在源码中看到,它是一个数组,通过EventExecutorChooser进行选择,每一个都指向一个具体的NioEventLoop。对于每个NioEventLoop,它内部有Selector,有很多通道注册,同时还有个事件循环去处理这些事情(上图右上侧部分)。对于每个通道,它都有一个ChannelPipeline(上图右下侧),里面有个handler链,它默认有两个,一个是头(HeaderContext),一个是尾(TailContext),这是netty自己加的,中间我们可以插入自己的处理器。
上面可以理解为我们平时用的workerGroup的结构,对于bossGroup,大致上和workderGroup差不多,只不过注册的时候,只有一个接收事件:
上面Selector对应的只有一个NioServerSocketChannel,里面也是一个事件循环,处理接收事件,它的ChannelPipeline也大致上一样,只不过最尾部的一个东西,肯定是ServerBootstrapAccept,专门负责accept事件。
NioEventLoop创建分析
上面了解了NioEventLoop的类结构,下面来了解一下它的创建过程。我们使用debug的形式查看创建过程,首先在 new 地方打一个断点:
然后debug启动项目,首先进入的是NioEventLoopGroup中的方法:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
我们在workerGroup处打断点发现,最终进入的也是这个方法,所以在两个地方打断点都可以。这个方法除了传入一个线程数量,还传入了一个Executor对象,前面讨论过,Executor就是最终干活的对象,最终实现是ThreadPerTaskExecutor,这里传的是null,说明我们是不需要传入的,由netty自己创建即可,我们继续往下走:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
这个方法中又多了一个参数,SelectorProvider定义了创建selector、ServerSocketChannel、SocketChannel等方法,采用Java的 Service Provider Interface (SPI) 方式实现,
从代码可以知道,创建过程为
如果provider已经创建,直接返回;
如果定义了java.nio.channels.spi.SelectorProvider属性,则采用该属性定义的类创建SelectorProvider并返回;如果失败,则继续;
采用SPI方法创建SelectorProvider并返回;如果实现,则通过DefaultSelectorProvider创建;
我们继续往下走,
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
这里多了一个参数,继续往下走:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
这里多了一个拒绝处理参数,继续往下走:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
从这里可以看到,线程数如果没有定义会使用默认的数值,默认的值就是系统cup*2:
继续往下走,
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
6、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
这里增加了一个我们上面讨论过的参数类型,DefaultEventExecutorChooserFactory,也就是EventExecutor的选择器,继续往下走:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
6、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
7、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
通过上面七步,我们组合齐了创建需要的所有参数,进入了这里,这里是真正创建开始的地方,从代码中看到,前面两个判断,一个是线程数量,另一个是对Executor的创建,使用的是ThreadPerTaskExecutor,
里面的execute方法很简单,每次执行的时候都会创建一个新的线程执行命令。
下面的children,就是我们前面提到的MultithreadEventExecutorGroup中的EventExecutor数组,我们指定了多少个线程数,就会创建多少个EventExecutor,来看一下每个EventExecutor的创建过程:
创建的时候调用的是new Child去创建,这个方法真正实现的地方是:
io.netty.channel.nio.NioEventLoopGroup#newChild
里面new 了一个NioEventLoop,具体的内容如下:
我们看到里面创建了一个selector,通过这样children数组就new 完了。
接下来就是创建chooser,作用是选择下一个EventExecutor,前面介绍过根据是否是2的指数的倍数进行选择,这里的断点打在了bossGroup上面,只创建了一个,因此是2的0次方,我们通过调试可以看到它选择的策略:
最下面的几行是对children的一些操作,这样整个new过程就结束了。我们再来回顾这张图:
创建的过程,EventExecutor数组创建好了,每个NioEventLoop的selector也创建了,但是没有看到事件循环线程启动,也没有看到事件注册等内容。所以在这个阶段,无论是boss还是worker都是一样的。从上面可以看出,创建流程是比较简单的。
NioEventLoop启动流程分析
netty是用ServerBootstrap进行启动,启动的代码是这一行:
1、 ChannelFuture f = server.bind(port).sync()
我们可以在这里打断点进行启动流程分析:
这里一种调用了两个方法,一个是bind方法,一个是sync方法,我们先从bind开始。
1、 ChannelFuture f = server.bind(port).sync()
2、 io.netty.bootstrap.AbstractBootstrap#bind(int)
继续往下:
1、 ChannelFuture f = server.bind(port).sync()
2、 io.netty.bootstrap.AbstractBootstrap#bind(int)
3、 io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
validate()主要是验证是否为空的:
继续往下走:
1、 ChannelFuture f = server.bind(port).sync()
2、 io.netty.bootstrap.AbstractBootstrap#bind(int)
3、 io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
4、 io.netty.bootstrap.AbstractBootstrap#doBind
这个方法内容比较多,来看第一行代码,执行了一个initAndRegister方法,回想我们自己写nio程序的时候,启动第一件事是把ServerSocketChannel初始化并注册到selector中,initAndRegister方法中也是做了同样的事情:
来看这个方法,首先通过 channel =channelFactory.newChannel(); 创建一个channel,我们配置ServerBootstrap的时候,channel传入的是 NioServerSocketChannel,这里通过一个工厂创建它的实例,当然这里用的是反射技术。具体实现代码如下:
io.netty.channel.ReflectiveChannelFactory#newChannel
反射执行的时候,最终会走到下面的方法:
io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
对于每个channel,都有这些属性,parent,id(自动生成的),unsafe,另外一个非常重要的就是pipeline属性,就是前面我们说的ChannelPipeline。刚开始创建的时候,啥都没有,就有默认的HeaderContext和TailContext:
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
接下来回到initAndRegister方法,创建完了,我们来看init方法,
io.netty.bootstrap.ServerBootstrap#init
前面是设置options,设置属性的一些内容,接下来, ChannelPipeline p = channel.pipeline(); 就是把pipeline拿出来,然后把childGroup和childHandler都拿出来,接下来是往pipeline里面加东西,
就是把我们在ServerBootstrap中配置的handler加进去。这里面先配置了ChannelPipeline,接下来就是ServerBootstrapAcceptor,这里面会把childGroup和childHandler都传进去,之所以会传这两个,是因为这里的ServerBootstrapAcceptor是要把接收到的NioSocketChannel的连接注册上去,所以这里需要有childGroup,而对于每个NioSocketChannel中的pipeline里面有什么东西,这里需要childHandler来看的。做完这些事情,init方法就完成了。
再次回到initAndRegister方法,前面的newChannel和init方法都完成了,下面是注册方法:
ChannelFuture regFuture = config().group().register(channel);
这里先调用config和group方法,返回的就是我们配置的bossGroup,这里如果bossGroup配置多个,也会从中选一个,所以配置多个也是没有用处的(具体理由参考前面一篇文章)。对于boss来说,这里只会初始化注册一次,选择一个。获取group后,调用的是register方法,最终到的方法是:
io.netty.channel.AbstractChannel.AbstractUnsafe#register