Netty系列(三):说说NioEventLoop
前言
本来想先写下NioServerSocketChannel以及NioSocketChannel的注册流程的,但是最后发现始终离不开NioEventLoop这个类,所以在这之前必须得先讲解下NioEventLoop这个类到底是用来做啥的。其实在第一篇文章里面有提及到它的,但是没有详细的去讲解,接下来会对它分析一波。
设计模型
在进入正文之前,先简单的了解下NioEventLoop的工作模型(服务端):
假设一个NioEventLoopGroup(这里服务端会用两个Group)里面有4个NioEventLoop,那么netty中的实际工作模型就如上图所示,服务端会用默认的选择规则从Group1中选择出一个NioEventLoop注册ServerChannel,并绑定一个OP_ACCEPT用于监听客户端发起的连接请求,一旦有新的连接进来,服务端则会从Group2中按一定的规则选出一个NioEventLoop来注册SocketChannel,并绑定OP_READ兴趣事件,这里注意,一个NioEventLoop可以绑定多个SocketChannel。具体的注册流程我会在下一篇文章中写出来。
下面进入正题。
构造流程
NioEventLoop具体的构造流程大家可以去我的Netty系列(一):NioEventLoopGroup源码解析中去看一下,里面说的还算蛮详细的。下面是其调用的构造函数,咱们可以观察到其身上会绑定一个选择器Selector
,供后期channel注册的时候使用的,这一块是JAVA NIO相关的知识点。
内部还维护着一个executor
去开启执行线程的,以及taskQueue
任务队列和一个tailTasks
尾部队列(这个队列里面的任务是在每次执行taskQueue
任务队列中的任务结束后都会去调用的,不多介绍)。上面介绍的三个四个内部结构Selector,executor,taskQueue,tailTasks
会在后面多次提起。
下图是NioEventLoop的简单的层级结构(下图取之于Netty in Action):
NioEventLoop.execute
这里我们先看一下NioEventLoop的execute
方法。实际上这个方法是在其父类SingleThreadEventExecutor
中。这个方法的功能就是将任务丢到taskQueue
中。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
// 开启工作线程,实际上也就是执行NioEventLoop中的run方法,下面会介绍
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
- 添加task到执行队列中,也就是咱们上文提起的
taskQueue
中。 - 判断这个NioEventLoop中的是否已经开启过线程。
- 若未开启,则必须先启动线程任务,也就是我们下文会介绍的
run
方法。 - 首次初始化会在
taskQueue
中丢一个空任务去唤醒线程。
NioEventLoop的工作模式实际上就是开启一个单线程跑一个死循环,然后一直轮询taskQueue
队列是否有任务添加进来,然后就去处理任务,还有就是如果注册在selector上的channel有兴趣事件进来,也会去处理selectorKeys
,这一块下面会做介绍。
NioEventLoop.run
现在看看NioEventLoop中的run
方法
protected void run() {
for (;;) {
try {
try {
// 按默认配置的话要么返回select.selectNow(),
//要么返回SelectStrategy.SELECT
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// IO操作,根据selectedKeys去处理
processSelectedKeys();
} finally {
// 保证执行完所有的任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// IO操作,根据selectedKeys去处理
processSelectedKeys();
} finally {
// 按一定的比例去处理任务,有可能遗留一部分任务下次进行处理
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
// 释放资源,将注册的channel全部关闭掉。
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
- 这里有个默认的计算策略:
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT
1.有任务便会直接返回
select.selectNow()
,则会直接去跑任务或者是去处理selectorKeys
;
2.若没任务,则会走select(wakenUp.getAndSet(false))
方法,里面会有一个timeout超时处理,selector.select(timeoutMillis)
,超时后也会去跑任务或者是去处理selectorKeys
;
这一块具体细节也很多,只是说一下处理流程。
- 注意上面有个
ioRatio == 100
这个判断条件,如果为100,则会将任务全部处理完成;否则会与io操作按一定的比例去执行任务。
这里的IO操作就是
processSelectedKeys()
方法,代码虽然很长,但是干的活就是根据不同的兴趣事件干不同的活,里面有对OP_READ OP_ACCEPT OP_WRITE OP_CONNECT
等等不同兴趣事件的不同处理方法,这一块应该是JAVA NIO里面的相关知识点。感兴趣的朋友可以debug针对某个触发事件研究一下。
runAllTasks
执行任务的代码如下(下面是runAllTasks
的代码):
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 这里会从定时任务队列中将达到执行事件的task丢到taskQueue中去
fetchedAll = fetchFromScheduledTaskQueue();
// 执行taskQueue中所有的task
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 这个是执行上面所说的tailTasks中的task
afterRunningAllTasks();
return ranAtLeastOne;
}
这一块的逻辑是:
先执行fetchFromScheduledTaskQueue
方法,将到期的定时任务丢到taskQueue
队列中,这个fetchFromScheduledTaskQueue
方法里面有个小细节,当taskQueue
队列满了之后,它就会重新塞到scheduledTaskQueue
队列中,然后再外圈循环,taskQueue
队列消费完毕,则继续执行fetchFromScheduledTaskQueue
方法,直到把所有到期的任务都丢到taskQueue
队列中执行完毕为止。如下图所示:
这一部分到这里就结束了,下一篇会对NioServerSocketChannel的注册以及服务端创建NioSocketChannel进行分析。