netty源码分析(三) - NioEventLoop - 2执
2020-10-09 本文已影响0人
进击的蚂蚁zzzliu
概述
执行过程主要有以下4步:
- 执行入口逻辑
- 轮询IO事件
- 处理IO事件
- 处理tasks
下面对这四个步骤进行详细解析
1. 执行入口逻辑
NioEventLoop入口有两个:
A: 服务端启动绑定端口时(netty源码分析(二) - 服务端启动 - 2中有详细分析)
B: 新连接接入通过chooser绑定一个NioEventLoop时(下一章节详细分析)
本节先分析A
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
//channel.eventLoop() -> NioEventLoop extend SingleThreadEventExecutor
channel.eventLoop().execute(new Runnable() {
//从execute里面代码可以看出,该runnable被当作task被add到taskQueue中; 在NioEventLoop run方法ranTasks = runAllTasks(0);时执行
@Override
public void run() {
if (regFuture.isSuccess()) {
//执行绑定端口逻辑,上一章节已经分析过
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
- channel.eventLoop():实际是当前NioServerSocketChannel的NioEventLoop,execute在父类SingleThreadEventExecutor中
- execute(new Runnable():从下面execute代码分析可以看出,该runnable被当作task添加到taskQueue中; 在NioEventLoop run方法ranTasks = runAllTasks(0);时执行
- channel.bind:绑定端口逻辑,上一章节已经分析过
private void execute(Runnable task, boolean immediate) {
//当前时main线程,executor thread = null
boolean inEventLoop = inEventLoop();
//提交过来的task并没有执行,只是放在了taskQueue中
addTask(task);
if (!inEventLoop) {
startThread();
}
}
- inEventLoop:判断NioEventLoop的thread成员变量(父类SingleThreadEventExecutor中)当前线程是否等于当前线程,此处thread为null返回false
- addTask:传过来的runnable并没有立即执行,只是放在了taskQueue中
- startThread:启动线程
private void doStartThread() {
//executor: ThreadPerTaskExecutor 上一节分析创建流程时的创建一个fastThreadLocalThread并启动
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
boolean success = false;
//更新线程最后执行时间
updateLastExecutionTime();
try {
//真正开始执行NioEventLoop中run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//忽略非重点逻辑
}
}
});
}
- executor:创建NioEventLoop是创建的ThreadPerTaskExecutor,该executor.execute直接创建一个线程直接start;(细节参考上一篇创建流程)
- SingleThreadEventExecutor.this.run():开始真正执行NioEventLoop中run方法
- finally:finally中主要是监控executor中线程状态,里面涉及FastThreadLocal/CountDownLatch等知识点可以自己分析下
run方法整体代码如下,下面对其中select / processSelectedKeys / runAllTasks进行详细分析
protected void run() {
//为了解决jdk空轮询bug设置的轮询次数
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//selectStrategy: 上一节创建流程中创建的calculateStrategy,服务端启动执行时task中有任务(前面1. 执行入口逻辑中add进去的)
//此时此处为0,不执行select操作;执行完下面的ranTasks = runAllTasks(0);,第二次循环会返回-1,执行select
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
//下一次定时任务触发截至时间,默认不是定时任务,返回 -1L
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//2. taskQueue中任务执行完,开始执行select进行阻塞
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
//IOException时,重构selector(重写打开一个selector,把原selector上的channel重新注册,然后关闭原selector)
rebuildSelector0();
selectCnt = 0;
//防止连续报错导致cpu过载每次IOException后sleep 1s再继续轮询
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
//IO事件执行时间占比,默认50
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
//3. 轮询到IO事件,进行处理
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
//4. 执行外部任务(非IO任务)
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//strategy <= 0,只执行外部任务,不处理IO事件
ranTasks = runAllTasks(0);
}
//执行到了task(ranTasks = true)或 轮询到的IO事件的个数strategy > 0,正常情况肯定会走下面if的逻辑
//每次把selectCnt置0;当出现jkd空轮询bug(即strategy=0)会走unexpectedSelectorWakeup解决该bug
if (ranTasks || strategy > 0) {
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
} catch (CancelledKeyException e) {
} catch (Throwable t) {
handleLoopException(t);
}
}
}
- selectCnt:是为了解决jdk空轮询bug设置的轮询次数
- selectStrategy: 上一节创建流程中创建的calculateStrategy,服务端启动执行时task中有任务(前面1. 执行入口逻辑中add进去的)此时此处为0,不执行select操作;执行完下面的ranTasks = runAllTasks(0);,第二次循环会返回-1,执行select
- curDeadlineNanos:下一次定时任务触发截至时间,默认不是定时任务,返回 -1L
- ioRatio:执行IO事件时间占比,默认为50
2. 轮询IO事件
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
- deadlineNanos:默认为NONE(Long类型最大值)
- selector.select():阻塞,直到有IO事件到来
- selector.selectNow():立即返回,没有IO事件返回0
- selector.select(timeoutMillis):最长阻塞timeoutMillis时间后返回
再来分析下netty是如何解决jdk空轮询bug的
//执行到了task(ranTasks = true)或 轮询到的IO事件的个数strategy > 0,正常情况肯定会走下面if的逻辑
//每次把selectCnt置0;当出现jkd空轮询bug(即strategy=0)会走unexpectedSelectorWakeup解决该bug
if (ranTasks || strategy > 0) {
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
private boolean unexpectedSelectorWakeup(int selectCnt) {
//空轮询次数达到阈值默认512次时,重构selector
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
return true;
}
return false;
}
- ranTasks || strategy > 0:执行到了task(ranTasks = true)或 轮询到的IO事件的个数strategy > 0,正常情况肯定会返回true;当连续多次轮询到事件都为0(即strategy=0)说明可能触发了jdk空轮询bug
-
unexpectedSelectorWakeup:当空轮询次数大于512次时会重构当前selector(重写打开一个selector,把原selector上的channel重新注册,然后关闭原selector)
可以看出netty其实并没有解决这个bug,只是用了一种讨巧的方式规避了这个bug。
3. 处理IO事件
private void processSelectedKeys() {
if (selectedKeys != null) {
//经过优化的方式
processSelectedKeysOptimized();
} else {
//平庸的方式
processSelectedKeysPlain(selector.selectedKeys());
}
}
- selectedKeys:上一节创建NioEventLoop,执行构造方法时创建(openSelector();),此处不为null
先来分析下selectedKeys创建过程
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//jdk原生selector,内部selectedKeys是一个set集合
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//是否取消优化,默认false
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
//原生SelectorImpl类
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
//netty提供的set,实际上是数组实现,add时间复杂度O(1),效率更高
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
//jdk9以上的版本直接操作对象内存偏移量设置selectedKeySet
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
//jdk9以下版本通过反射设置selectedKeySet
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
}
}
});
//保存到成员变量selectedKeys
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
- unwrappedSelector:jdk原生selector,内部selectedKeys是一个HashSet集合
- selectedKeySet:netty提供的set,实际上是数组实现,操作简洁,效率更高
- SelectorImpl:通过反射或对象内存地址偏移量操作把SelectorImpl的selectedKeys / publicSelectedKeys替换成selectedKeySet
下面接着分析processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
//1. 取出IO事件以及对应的channel
final SelectionKey k = selectedKeys.keys[i];
//k的引用置null,便于gc回收
selectedKeys.keys[i] = null;
//attachment是《netty源码分析(二) - 服务端启动 - 2》中注册selector时放进去的NioServerSocketChannel
final Object a = k.attachment();
//2. 处理该channel
if (a instanceof AbstractNioChannel) {
//对于boss NioEventLoop,轮询到的基本是连接事件,后续的事情就是通过他的pipeline将连接扔给一个worker NioEventLoop处理
//对于worker NioEventLoop来说,轮循道的基本商是IO读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//3. 注销时需要再次轮询
if (needsToSelectAgain) {
//清空数组条目(数组元素都置成null)便于gc回收
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
- selectedKeys:遍历selectedKeys内数组处理轮询到的事件
- k.attachment():attachment是《netty源码分析(二) - 服务端启动 - 2》中注册selector时放进去的NioServerSocketChannel
- processSelectedKey:具体处理事件(后续新连接建立流程再具体分析)
4. 处理tasks
protected boolean runAllTasks(long timeoutNanos) {
//把定时任务队列(截至时间次序的优先队列PriorityQueue)中可以执行的任务放到taskQueue中
fetchFromScheduledTaskQueue();
//取出第一个task
Runnable task = pollTask();
if (task == null) {
//任务队列为空,返回false
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
//循环执行taskQueue中所有任务
for (;;) {
//task.run()执行任务
safeExecute(task);
runTasks ++;
// 每执行64个任务计算以下超时时间nanoTime()
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
//更新最后执行时间
this.lastExecutionTime = lastExecutionTime;
return true;
}
- runAllTasks逻辑比较简单,就是执行taskQueue中所有task以及定时任务队列中到时的task
至此,NioEventLoop主要流程分析完毕