三、netty源码分析之EventLoop
一、EventLoop功能概述
上篇我们分析了EventLoopGroup的核心能力,EventLoopGroup具有执行任务、注册Channel、执行器调度等能力。今天我们来看一下EventLoop。我们先来看看EventLoop的类图关系:
我们可以看到,EventLoop接口继承了EventLoopGroup接口。为什么EventLoop要继承EventLoopGroup呢?从上一篇的分析,我们知道,EventLoopGroup最主要的功能时对EventLoop进行管理调度,EventLoopGroup的其他大部分功能,都是交给自己管理的EventLoop来处理的。而EventLoop继承EventLoopGroup,就是为了继承EventLoopGroup任务执行、优雅停机、Channel注册等功能窗口。
除了继承EventLoopGroup之外,EventLoop还继承了EventExecutor接口。我们可以看一下EventExecutor的具体内容:
/**
* The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
* with some handy methods to see if a {@link Thread} is executed in a event loop.
* Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
* way to access methods.
*
*/
public interface EventExecutor extends EventExecutorGroup {
/**
* Returns a reference to itself.
*/
@Override
EventExecutor next();
/**
* Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
*/
EventExecutorGroup parent();
/**
* Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
*/
boolean inEventLoop();
/**
* Return {@code true} if the given {@link Thread} is executed in the event loop,
* {@code false} otherwise.
*/
boolean inEventLoop(Thread thread);
/**
* Return a new {@link Promise}.
*/
<V> Promise<V> newPromise();
/**
* Create a new {@link ProgressivePromise}.
*/
<V> ProgressivePromise<V> newProgressivePromise();
/**
* Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newSucceededFuture(V result);
/**
* Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newFailedFuture(Throwable cause);
}
从接口的头部注释我们可以看到,EventExecutor是一个特殊的EventExecutorGroup,它提供了一些易用的方法来判断一个线程是否正在事件循环中执行。至于EventExecutorGroup我们上一篇分析过这个接口的能力,这里就不再赘述了。我们看一看EventExecutor的几个重要的的方法:
首先是EventExecutorGroup parent();
方法,EventExecutor只有事件执行的能力,没有调度的能力,所以这个方法只会返回对象自身。
然后是两个重载的inEventLoop
方法,用来判断线程是否正在事件循环中执行。
随后是两个创建Promise的方法,关于Promise的作用,大家不清楚的可以查一下相关资料,内部具体实现我们在后面的文章中再做分析。
最后,是一对创建Future的方法,我们从注释中可以看到这两个方法的作用,就是创建一个已经被标记成成功/失败的Future对象。所有已经注册的FutureListener都会被直接通知。所有的阻塞方法都会非阻塞的返回。
我们的EventLoop继承了OrderedEventExecutor,而OrderedEventExecutor直接继承了EventExecutor,本身并无定义其他方法。但是我们可以从OrderedEventExecutor的头部注释中看到,OrderedEventExecutor其实是一个标记接口,这个接口保证所有执行的任务必须按照顺序执行,并且要串行执行!所以我们可以相信,实现了OrderedEventExecutor的类,执行任务的时候回保证任务执行的顺序性,并且同一时刻只能执行一个任务。
到这里,我们可以知道,EventLoop的核心能力:EventLoop是一个可以优雅停机的任务执行器,它能保证提交的任务都被顺序串行执行。接下来我们根据EventLoop的一个具体实现类NioEventLoop来更直观的理解一下EventLoop的能力。
从NioEventLoop来看EventLoop在netty中扮演的角色
首先我们先看一看NioEventLoop的构造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
我们一路跟踪,会发现,这个构造方法调用了父类SingleThreadEventExecutor的构造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
我们可以看到这里面有一行this.executor = ThreadExecutorMap.apply(executor, this);
。这个构造方法传入的executor
参数就是我们上节提到过的NioEventLoopGrop在创建NioEventLoop时传入的ThreadPerTaskExecutor对象。这里在给成员变量赋值的时候调用了ThreadExecutorMap.apply(executor, this)
,我们可以看一下这里面的具体内容:
//ThreadExecutorMap类的相关内容
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
我们可以看到,Executor apply(final Executor executor, final EventExecutor eventExecutor)
重新创建了一个Executor对象,这个对象执行任务还是调用参数传入的Executor 来执行,只不过是在传入的任务中做了一个静态代理,在任务执行的前后分别将执行此任务的EventExecutor绑定、解绑到自身持有的一个FastThreadLocal中。这里的FastThreadLocal是netty自己实现的一个处理线程单例的工具,这个FastThreadLocal究竟比我们jdk中的ThreadLocal快在哪里呢?我们把这个类的set方法拿出来看一下(在此之前你必须要知道jdkThreadLoop的实现原理):
//FastThreadLocal的set方法
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
remove();
}
}
InternalThreadLocalMap的get()方法:
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
我们可以看到这个FastThreadLocal在获取Map的时候会判断当前的线程是否是FastThreadLocalThread的对象,是的话就调用fastGet(FastThreadLocalThread thread)
方法获取InternalThreadLocalMap(不存在就创建);如果不是FastThreadLocalThread的对象,就调用slowGet()
获取,获取逻辑是从一个静态的ThreadLocal对象中获取当前线程绑定的InternalThreadLocalMap对象,没有的话就创建一个。在获取到InternalThreadLocalMap的对象后,怎么向里面赋值呢?我们可以看一下FastThreadLocal中的set
方法赋值的真正逻辑:
// FastThreadLocal的set方法
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
//index是FastThreadLocal维护的一个索引对象
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
}
}
// InternalThreadLocalMap的方法
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
我们可以看到,其实InternalThreadLocalMap内部是一个数组,每个FastThreadLocal都记录了自身维护的线程单例的对象再数组中的位置,即index
这个成员变量。这个index
的值是在FastThreadLocal初始化的时候从InternalThreadLocalMap内部的一个静态递增变量处获取的。 InternalThreadLocalMap这种方式和jdk内部的ThreadLocalMap使用散列表的方式存储对象相比,优点是:获取和设置线程单例对象的时候,少了hash值计算这一步,并且没有hash冲撞的情况发生。这一点相比ThreadLocalMap*的确性能会有所提升。这也是netty对性能优化的一方面体现,后面我们还会看到好多在细节上的优化。
我们花了很大的篇幅分析了NioEventLoop的构造方法,目的就是为了让大家看到netty对性能的优化都是落到很多细节上的。下面我们继续分析NioEventLoop构造方法的剩余内容,接下来我们会看到netty的另一个优化,在此之前大家要熟悉Java的NIO,不然接下来内容肯定是看不懂的!
我们可以看到NioEventLoop有下面几个成员变量:
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
我们在NioEventLoop构造方法中可以看到对这几个成员变量的初始化过程。
首先,我们可以看到,构造方法中通过openSelector()
方法生成了一个SelectorTuple的对象,然后将SelectorTuple中的selector
和unwrappedSelector
赋值给NioEventLoop的队形属性。我们可以看一下openSelector()的内容:
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//创建一个Selector对象
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
//禁止优化选型,如果选择禁止优化,就直接创建一个SelectorTuple对象返回
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
//下面是优化的开始
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//通过反射获取Selector的相关Field
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
//通过反射设置Selector对应的属性的值
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
我们可以看到,整个openSelector()
方法做的事情就是:判断参数是否允许相关优化,如果允许优化,就将创建的Selector的对象的两个属性:selectedKeys
、publicSelectedKeys
重写为:SelectedSelectionKeySet对象。关于selectedKeys
、publicSelectedKeys
,大家可以看一看Selector的API,这里不再赘述。这里为什么要对这两个属性重新赋值呢?为什么重新赋值了就是优化了呢?我们先来看一下这两个属性在Selector中是什么:
//SelectorImpl的部分代码
protected Set<SelectionKey> selectedKeys = new HashSet();
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
我们可以看到,原来的selectedKeys
和publicSelectedKeys
归根结底都是HashSet。而替换成的SelectedSelectionKeySet又是什么呢?我们来看一下:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public int size() {
return size;
}
@Override
public Iterator<SelectionKey> iterator() {
//省略
}
void reset() {
reset(0);
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
private void increaseCapacity() {
//省略
}
}
我们可以看到,SelectedSelectionKeySet继承了AbstractSet,但是它内部实现压根不能算是一个Set,因为它的add方法没有保证元素在集合中唯一的相关实现!为什么要这么做呢?我们不妨先看一下jdk中对selectedKeys
这个集合添加元素的相关逻辑,由于没有源码,只能看到变量名是var这种定义,不过不影响我们对逻辑的理解:
if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
if (var9.clearedCount != var1) {
if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
} else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
var9.clearedCount = var1;
} else {
if (var9.clearedCount != var1) {
var10.channel.translateAndSetReadyOps(var4, var10);
if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
} else {
var10.channel.translateAndUpdateReadyOps(var4, var10);
if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
}
var9.clearedCount = var1;
}
我们可以看到,在把元素添加到selectedKeys
之前,会判断selectedKeys
是否已经包含了这个元素,包含的话就操作已经在就不再进行添加操作,不包含的时候才进行添加操作。而SelectedSelectionKeySet的判断是否存在指定元素的方法始终返回false,也就意味着,selectedKeys
会被添加重复的SelectionKey
对象。添加重复的SelectionKey
对象会有什么影响呢?在netty中对准备就绪的SelectionKey
做处理之前,都会判断SelectionKey
对象就绪的状态,处理完该事件之后,会把SelectionKey
对象的就绪状态移除。所以如果重复添加SelectionKey
对象,在这里是不会有任何影响的!那这种用数组直接替代HashMap的操作有什么好处呢?首先,我们看,NioEventLoop继承了SingleThreadEventLoop,我们可以猜出,NioEventLoop是单线程操作selectedKeys
的。单线程操作数组有什么好处呢?单线程操作可以充分利用CPU的高速缓存,避免伪共享的发生!并且netty的处理selectedKeys
时,只会在处理完所有的就绪的SelectionKey
清空数组,之后再次调用select方法。所以不存在添加时找空槽的情况,只要顺序的往数组里面加元素就可以了!这种操作比HashMap添加、删除操作性能要高太多(做了一个小的测试,从容量为10000的数组和HashMap中删除元素,HashMap耗时大概是数组的十倍左右)。
我们花了大量的篇幅分析了EventLoop的构造方法。这里主要是想让大家看到netty对性能的优化真的无处不在!而且是千方百计的去优化!这也是netty被广泛应用的原因。包括好多高性能高吞吐的中间件也使用了netty做通信,比如RocketMQ、Spark。而我们在分析类似netty这种高性能框架的源码时,一定要注意到这些优化细节,这样我们才能清楚这些框架哪里好,才能知道怎么样才能正确的使用这些框架来充分发挥它们的优势!
我们继续看NioEventLoop的主要逻辑,接下来我们看一下run()
方法:
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
run()
方法是父类SingleThreadEventExecutor的模板方法的实现。我们可以看到,run()
方法就是一个不断的循环,在循环内做了什么操作呢?首先,先调用selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
来获取select策略。我们先来看一下SelectStrategy这个接口:
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
* 接下来要执行阻塞的select操作
*/
int SELECT = -1;
/**
* IO循环应该被重试,非阻塞select接下来会被直接执行
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* 接下来不要阻塞获取新的事件IO循环
* Indicates the IO loop to poll for new events without blocking.
*/
int BUSY_WAIT = -3;
/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
SelectStrategy提供了三种默认的select策略,即SELECT、CONTINUE、BUSY_WAIT。netty中实现了一个默认的DefaultSelectStrategy,它的计算select策略的方式是:
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
如果当前EventLoop任务队列中没有任务,就执行SELECT策略,即阻塞的select。如果有的话,就返回当前NioEventLoop中持有的Selector对象的selectNow()
方法的返回值,就绪的IO事件的数量。也就是不选择任何select模式。这个过程中其实已经执行了一次非阻塞的selectNow操作
:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
在获取到需要执行的IO select策略后,就选择执行具体的内容,我们可以看到,CONTINUE对应的执行方法就是不执行接下来的逻辑,重新执行select策略的选择。而NIO不支持忙等操作,所以BUSY_WAIT的逻辑和SELECT的逻辑是一致性的,都调用了select(wakenUp.getAndSet(false));
方法。这里,我们先要清楚wakenUp这个成员变量的含义,我们先看一下这块内容:
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
wekenUp的含义是:控制阻塞的Selector.select在执行的过程中,是否允许被打断。在使用Selector.select的过程中,select方法会被设置超时时间,设置wekenUp为ture时,Selector.select超时后不会继续重新再次被调用。
清楚了wekenUp这个参数的含义后,我们看一下NioEventLoop的具体select
操作是什么逻辑:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//计算出select阻塞时间的最后截止时间,这个时间计算的方式是当前时间加上提交到当前EventLoop中的最近需要执行的定时任务的延迟时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 计算出select的阻塞时间,加500000L是为了始终进位。如果整个select操作执行的时间超过了selectDeadLineNanos,整个方法就结束
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// 如果任务被添加进来,并且任务中想要调用Selector#wakeup方法让Selector提前从阻塞的select方法中返回的话,如果不执行下面操作,就实现不了这个效果,只能等Selector的select方法阻塞timeoutMillis时间后返回。
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
我们来分析一下这段代码的逻辑(在此之前,我们先要清楚,理论上只有当当前EventLoop的任务队列中没有任务的时候才会调用select
这个方法SelectStrategy中的逻辑,只有hasTask()是false的时候才返回SELECT,在在调用EventLoop的select方法之前,wakenUp会被设置成false)。首先,计算出select操作最长的阻塞时间timeoutMillis
。然后判断hasTasks()的返回值,即EventLoop中是否有添加的任务,如果有的话就说明我们在之前的SelectStrategy选择select策略之后,又有新的任务添加进来了,这个时候为了防止新添加的任务要等到select操作阻塞完成才有机会执行,就做了一个判断:当前的wekenUp如果是false,就设置成ture,然后执行一个非阻塞的selector.selectNow后跳出NioEventLoop.select;否则就继续执行接下来的逻辑。也就是执行Selector.select阻塞操作。selector.selectNow方法结束后会判断,是否有就绪的IO事件,当一下情况满足任意一条就跳出循环结束EventLoop.select方法:有就绪的IO事件、wakenUp在NioEventLoop.select调用之前是true、当前EventLoop有提交的立即执行的任务、当前EventLoop中有提交的定时执行的任务。如果不满足任意情况,就判断是否当前线程有中断状态,有的话也跳出循环。最后判断循环的总时间是否大于设置的Selector.select的超时时间,判断Selector.select是不是因为超时而结束。如果是因为超时而结束就将selectCnt设置为1,继续循环;不是的话就判断循环的次数是否大于SELECTOR_AUTO_REBUILD_THRESHOLD,是的话就跳出循环,这块是为了解决jdk6的关于NIO的bug,我们可以先不用管这个逻辑。到此整个NioEventLoop.select的过程就结束了。这个过程看起来非常乱,我们要弄清楚整个流程,首先要先明白wakenUp这个属性的生命周期。我们可以看到,wakenUp这个属性提供给外部的修改窗口只有一个:
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
这个方法是protected修饰的,也就是说,这个方法是不提供其他包调用的,所以这个方法是一个netty内部的调用方法,我们可以搜索到这个方法在哪里使用:
我们可以看到,这个方法主要是在停机的时候调用的。为的就是在停机的时候将Selector.select从阻塞中唤醒。
细心地朋友也许会发现,
NioEventLoop.select
方法在调用之前,会把wakenUp
设置为false,这是为什么呢?为的就是在外部调用NioEventLoop.wakeup
方法的时候wakenUp.compareAndSet(false, true)
这个会设置成功,然后可以调用selector.wakeup()将Selector唤醒。
到这里,我们再回过头去看NioEventLoop.select方法,这个方法的作用其实就是:调用Selector.select方法来阻塞地获取就绪的IO事件,并且在任何时候都可以响应weakUp操作,如果NioEventLoop中添加定时任务,NioEventLoop.select会执行的时间最多就是到最近定时任务执行的时间,没有定时任务就最多执行1s。这样去理解是不是简单多了!!!
细心的朋友可能会问:为什么要限制NioEventLoop.select的执行时间最长到下一个定时任务执行的时间呢?我们先带着疑问继续往下看NioEventLoop.run方法。
在结束了select操作之后,继续判断一下wakenUp的标志,如果设置为ture,就调用selector.wakeup();
使下一次的selector.select
非阻塞。
随后会获取当前的ioRatio
,我们之前提过这个参数,这个参数是设置我们的IO操作在整个事件执行中的时间占比的,我们看一下下面的具体逻辑。首先,会判断ioRatio
是不是设置100,如果是设置百分之百,就先执行processSelectedKeys()
,再执行runAllTasks()
,不设置事件占比限制。如果ioRatio
不是100,就先执行processSelectedKeys()
,并且记录下processSelectedKeys()
的执行时间,然后计算出剩余时间,使用这个剩余时间来限制runAllTasks
方法。这两个方法就是干什么的呢?这里我先给出答案:processSelectedKeys()
的作用是处理所有的就绪的selectedKeys
,也就是就绪的IO事件;而runAllTasks
这两个重载方法就是执行所有的提交任务。到这里,我们可以明白为什么要限制NioEventLoop.select
的执行时间最长到下一个定时任务开始执行的时间了。因为IO处理和任务执行是在一个线程里执行的,如果不限制NioEventLoop.select
的执行时间,到达下一个定时任务需要执行的时间的时候,有可能整个线程还阻塞在select方法上!
接下来我们继续分析processSelectedKeys()
和runAllTasks
分别是怎么处理IO事件和提交的任务的。我们先来看一下processSelectedKeys()
:
private void processSelectedKeys() {
//判断是否使用了优化过的Selector,是的话就循环数组,不是的话就循环iterator。
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
首先方法会判断我们是否使用数组来替代Map优化Selector,这个是我们上边分析NioEventLoop的构造方法讲的。我们这里只看优化的方法,其实两个逻辑都是一样的,只是循环的方法不一样。整个执行过程就是遍历就绪的SelectionKey。然后交给processSelectedKey
的两个重载方法去处理。这里会根据SelectionKey的attachment对象的类型来判断调用哪个重载方法。我们先不用管这个attachment对象是什么时候被添加的,这个会在我们只会的分析中讲到,我们先来看一下这两个方法的逻辑:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
}
}
}
processSelectedKey(SelectionKey k, AbstractNioChannel ch)
:我们方法开始的验证逻辑先不看,主要看下面的事件就绪的逻辑。首先,会获取就绪的事件,判断就绪的事件中是否包含连接事件,如果包含,就将当前
SelectionKey的连接就绪事件从SelectionKey的感兴趣的事件中剔除掉,然后将就绪事件交给就绪的AbstractNioChannel的unsafe去处理,调用的是unsafe.finishConnect()方法。具体处理逻辑我们本篇不做分析。然后就是去判断就绪的事件中是否包含了写就绪、读就绪、ACCEPT事件,包含的话都是委托给unsafe的对应方法。
processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
:这个方法很简单,就是执行NioTask的channelReady
方法,如果执行失败了,就执行channelUnregistered
方法。我们这里可以猜测NioTask是一个IO就绪事件的回掉方法。
IO就绪事件的处理逻辑很简单,我们接下里看一下提交任务的处理逻辑,我们只看可以设置超时时间的重载方法protected boolean runAllTasks(long timeoutNanos)
:
protected boolean runAllTasks(long timeoutNanos) {
//把定时任务队列里到达执行时间的任务添加到非定时任务队列
fetchFromScheduledTaskQueue();
//从非定时任务队列获取任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
首先,将到达执行时间的定时任务添加到非定时任务的执行列表中,然后从费定时任务列表中获取任务,没有的话就执行afterRunningAllTasks();
,这是一个开放方法,我们这里先不看具体内容。如果有任务,就加入循环中,循环的内容就是:调用safeExecute
来执行任务,其实就是在try-cache中执行任务,防止有异常终止。然后已经执行的方法计数加以,判断调用runAllTasks
执行的任务个数和0x3F的与是不是0,也就是是不是64的倍数。如果是就检查任务执行的时间有没有超过设置的超时时间,超过了就结束循环,然后调用afterRunningAllTasks();
。没有超时的话就继续获取任务。这个逻辑也比较简单。
分析到这里我们就把NioEventLoop.run
方法分析完了。run方法的作用用一句话概括就是处理就绪的IO事件和提交的任务。那么问题来了,这个run方法在什么时候被调用呢?我们一路跟着调用链寻找会发现,在NioEventLoop父类SingleThreadEventExecutor的execute(Runnable task)
方法被调用的时候就调用了run()
方法。当然run()
方法是一个重载方法,我们上面分析的是NioEventLoop
的实现。
这里我们的NioEventLoop的关键代码分析就基本上结束了。
三、复盘
本篇我们分析了NioEventLoop,NioEventLoop除了可以执行提交的任务之外,还可以监听注册的Channel的IO事件,并且可以根据ioRatio来控制两者执行的时间占比。这都是通过它的run()
方法来执行的。
那么,NioEventLoop在netty中的定位也显而易见了:真正的任务执行者。在EventLoop的基础上,netty实现了一个抽象类SingleThreadEventLoop,SingleThreadEventLoop还继承了SingleThreadEventExecutor,这就使SingleThreadEventLoop具有一个开放性的模板方法:run()
方法,我们可以通过run()
来实现自己的任务处理逻辑。而NioEventLoop就是通过实现run()
方法来定制自己可以同时处理提交的任务和就绪的IO事件的能力。
下篇,我们会分析,netty是怎么将各个组件串联起来的。