三、netty源码分析之EventLoop

2019-09-28  本文已影响0人  丑星星

一、EventLoop功能概述

上篇我们分析了EventLoopGroup的核心能力,EventLoopGroup具有执行任务、注册Channel、执行器调度等能力。今天我们来看一下EventLoop。我们先来看看EventLoop的类图关系:

EventLoop
我们可以看到,EventLoop接口继承了EventLoopGroup接口。为什么EventLoop要继承EventLoopGroup呢?从上一篇的分析,我们知道,EventLoopGroup最主要的功能时对EventLoop进行管理调度,EventLoopGroup的其他大部分功能,都是交给自己管理的EventLoop来处理的。而EventLoop继承EventLoopGroup,就是为了继承EventLoopGroup任务执行、优雅停机、Channel注册等功能窗口。
除了继承EventLoopGroup之外,EventLoop还继承了EventExecutor接口。我们可以看一下EventExecutor的具体内容:
/**
 * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
 * way to access methods.
 *
 */
public interface EventExecutor extends EventExecutorGroup {

    /**
     * Returns a reference to itself.
     */
    @Override
    EventExecutor next();

    /**
     * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
     */
    EventExecutorGroup parent();

    /**
     * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
     */
    boolean inEventLoop();

    /**
     * Return {@code true} if the given {@link Thread} is executed in the event loop,
     * {@code false} otherwise.
     */
    boolean inEventLoop(Thread thread);

    /**
     * Return a new {@link Promise}.
     */
    <V> Promise<V> newPromise();

    /**
     * Create a new {@link ProgressivePromise}.
     */
    <V> ProgressivePromise<V> newProgressivePromise();

    /**
     * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
     * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     */
    <V> Future<V> newSucceededFuture(V result);

    /**
     * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
     * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     */
    <V> Future<V> newFailedFuture(Throwable cause);
}

从接口的头部注释我们可以看到,EventExecutor是一个特殊的EventExecutorGroup,它提供了一些易用的方法来判断一个线程是否正在事件循环中执行。至于EventExecutorGroup我们上一篇分析过这个接口的能力,这里就不再赘述了。我们看一看EventExecutor的几个重要的的方法:
首先是EventExecutorGroup parent();方法,EventExecutor只有事件执行的能力,没有调度的能力,所以这个方法只会返回对象自身。
然后是两个重载的inEventLoop方法,用来判断线程是否正在事件循环中执行。
随后是两个创建Promise的方法,关于Promise的作用,大家不清楚的可以查一下相关资料,内部具体实现我们在后面的文章中再做分析。
最后,是一对创建Future的方法,我们从注释中可以看到这两个方法的作用,就是创建一个已经被标记成成功/失败的Future对象。所有已经注册的FutureListener都会被直接通知。所有的阻塞方法都会非阻塞的返回。

我们的EventLoop继承了OrderedEventExecutor,而OrderedEventExecutor直接继承了EventExecutor,本身并无定义其他方法。但是我们可以从OrderedEventExecutor的头部注释中看到,OrderedEventExecutor其实是一个标记接口,这个接口保证所有执行的任务必须按照顺序执行,并且要串行执行!所以我们可以相信,实现了OrderedEventExecutor的类,执行任务的时候回保证任务执行的顺序性,并且同一时刻只能执行一个任务。

到这里,我们可以知道,EventLoop的核心能力:EventLoop是一个可以优雅停机的任务执行器,它能保证提交的任务都被顺序串行执行。接下来我们根据EventLoop的一个具体实现类NioEventLoop来更直观的理解一下EventLoop的能力。

从NioEventLoop来看EventLoop在netty中扮演的角色

首先我们先看一看NioEventLoop的构造方法:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

我们一路跟踪,会发现,这个构造方法调用了父类SingleThreadEventExecutor的构造方法:

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

我们可以看到这里面有一行this.executor = ThreadExecutorMap.apply(executor, this);。这个构造方法传入的executor参数就是我们上节提到过的NioEventLoopGrop在创建NioEventLoop时传入的ThreadPerTaskExecutor对象。这里在给成员变量赋值的时候调用了ThreadExecutorMap.apply(executor, this),我们可以看一下这里面的具体内容:

    //ThreadExecutorMap类的相关内容

    private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();

    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(executor, "executor");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        return new Executor() {
            @Override
            public void execute(final Runnable command) {
                executor.execute(apply(command, eventExecutor));
            }
        };
    }

    public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        return new Runnable() {
            @Override
            public void run() {
                setCurrentEventExecutor(eventExecutor);
                try {
                    command.run();
                } finally {
                    setCurrentEventExecutor(null);
                }
            }
        };
    }

    private static void setCurrentEventExecutor(EventExecutor executor) {
        mappings.set(executor);
    }

我们可以看到,Executor apply(final Executor executor, final EventExecutor eventExecutor)重新创建了一个Executor对象,这个对象执行任务还是调用参数传入的Executor 来执行,只不过是在传入的任务中做了一个静态代理,在任务执行的前后分别将执行此任务的EventExecutor绑定、解绑到自身持有的一个FastThreadLocal中。这里的FastThreadLocal是netty自己实现的一个处理线程单例的工具,这个FastThreadLocal究竟比我们jdk中的ThreadLocal快在哪里呢?我们把这个类的set方法拿出来看一下(在此之前你必须要知道jdkThreadLoop的实现原理):

  //FastThreadLocal的set方法
    public final void set(V value) {
        if (value != InternalThreadLocalMap.UNSET) {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove();
        }
    }

InternalThreadLocalMap的get()方法:

    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }

    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }

我们可以看到这个FastThreadLocal在获取Map的时候会判断当前的线程是否是FastThreadLocalThread的对象,是的话就调用fastGet(FastThreadLocalThread thread)方法获取InternalThreadLocalMap(不存在就创建);如果不是FastThreadLocalThread的对象,就调用slowGet()获取,获取逻辑是从一个静态的ThreadLocal对象中获取当前线程绑定的InternalThreadLocalMap对象,没有的话就创建一个。在获取到InternalThreadLocalMap的对象后,怎么向里面赋值呢?我们可以看一下FastThreadLocal中的set方法赋值的真正逻辑:

  // FastThreadLocal的set方法
    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        //index是FastThreadLocal维护的一个索引对象
        if (threadLocalMap.setIndexedVariable(index, value)) {
            addToVariablesToRemove(threadLocalMap, this);
        }
    }

  // InternalThreadLocalMap的方法
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }

我们可以看到,其实InternalThreadLocalMap内部是一个数组,每个FastThreadLocal都记录了自身维护的线程单例的对象再数组中的位置,即index这个成员变量。这个index的值是在FastThreadLocal初始化的时候从InternalThreadLocalMap内部的一个静态递增变量处获取的。 InternalThreadLocalMap这种方式和jdk内部的ThreadLocalMap使用散列表的方式存储对象相比,优点是:获取和设置线程单例对象的时候,少了hash值计算这一步,并且没有hash冲撞的情况发生。这一点相比ThreadLocalMap*的确性能会有所提升。这也是netty对性能优化的一方面体现,后面我们还会看到好多在细节上的优化。

我们花了很大的篇幅分析了NioEventLoop的构造方法,目的就是为了让大家看到netty对性能的优化都是落到很多细节上的。下面我们继续分析NioEventLoop构造方法的剩余内容,接下来我们会看到netty的另一个优化,在此之前大家要熟悉Java的NIO,不然接下来内容肯定是看不懂的!

我们可以看到NioEventLoop有下面几个成员变量:

    private Selector selector;
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;

    private final SelectorProvider provider;

我们在NioEventLoop构造方法中可以看到对这几个成员变量的初始化过程。
首先,我们可以看到,构造方法中通过openSelector()方法生成了一个SelectorTuple的对象,然后将SelectorTuple中的selectorunwrappedSelector赋值给NioEventLoop的队形属性。我们可以看一下openSelector()的内容:

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            //创建一个Selector对象
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            //禁止优化选型,如果选择禁止优化,就直接创建一个SelectorTuple对象返回
            return new SelectorTuple(unwrappedSelector);
        }

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }
        //下面是优化的开始
        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    //通过反射获取Selector的相关Field
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            //通过反射设置Selector对应的属性的值
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

我们可以看到,整个openSelector()方法做的事情就是:判断参数是否允许相关优化,如果允许优化,就将创建的Selector的对象的两个属性:selectedKeyspublicSelectedKeys重写为:SelectedSelectionKeySet对象。关于selectedKeyspublicSelectedKeys,大家可以看一看Selector的API,这里不再赘述。这里为什么要对这两个属性重新赋值呢?为什么重新赋值了就是优化了呢?我们先来看一下这两个属性在Selector中是什么:

    //SelectorImpl的部分代码
    protected Set<SelectionKey> selectedKeys = new HashSet();
    this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);

我们可以看到,原来的selectedKeyspublicSelectedKeys归根结底都是HashSet。而替换成的SelectedSelectionKeySet又是什么呢?我们来看一下:

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }
        return true;
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public int size() {
        return size;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
       //省略
    }

    void reset() {
        reset(0);
    }

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }

    private void increaseCapacity() {
      //省略
    }
}

我们可以看到,SelectedSelectionKeySet继承了AbstractSet,但是它内部实现压根不能算是一个Set,因为它的add方法没有保证元素在集合中唯一的相关实现!为什么要这么做呢?我们不妨先看一下jdk中对selectedKeys这个集合添加元素的相关逻辑,由于没有源码,只能看到变量名是var这种定义,不过不影响我们对逻辑的理解:

                            if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
                                if (var9.clearedCount != var1) {
                                    if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
                                        var9.updateCount = var1;
                                        ++var6;
                                    }
                                } else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
                                    var9.updateCount = var1;
                                    ++var6;
                                }

                                var9.clearedCount = var1;
                            } else {
                                if (var9.clearedCount != var1) {
                                    var10.channel.translateAndSetReadyOps(var4, var10);
                                    if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
                                        WindowsSelectorImpl.this.selectedKeys.add(var10);
                                        var9.updateCount = var1;
                                        ++var6;
                                    }
                                } else {
                                    var10.channel.translateAndUpdateReadyOps(var4, var10);
                                    if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
                                        WindowsSelectorImpl.this.selectedKeys.add(var10);
                                        var9.updateCount = var1;
                                        ++var6;
                                    }
                                }

                                var9.clearedCount = var1;
                            }

我们可以看到,在把元素添加到selectedKeys之前,会判断selectedKeys是否已经包含了这个元素,包含的话就操作已经在就不再进行添加操作,不包含的时候才进行添加操作。而SelectedSelectionKeySet的判断是否存在指定元素的方法始终返回false,也就意味着,selectedKeys会被添加重复的SelectionKey对象。添加重复的SelectionKey对象会有什么影响呢?在netty中对准备就绪的SelectionKey做处理之前,都会判断SelectionKey对象就绪的状态,处理完该事件之后,会把SelectionKey对象的就绪状态移除。所以如果重复添加SelectionKey对象,在这里是不会有任何影响的!那这种用数组直接替代HashMap的操作有什么好处呢?首先,我们看,NioEventLoop继承了SingleThreadEventLoop,我们可以猜出,NioEventLoop是单线程操作selectedKeys的。单线程操作数组有什么好处呢?单线程操作可以充分利用CPU的高速缓存,避免伪共享的发生!并且netty的处理selectedKeys时,只会在处理完所有的就绪的SelectionKey清空数组,之后再次调用select方法。所以不存在添加时找空槽的情况,只要顺序的往数组里面加元素就可以了!这种操作比HashMap添加、删除操作性能要高太多(做了一个小的测试,从容量为10000的数组和HashMap中删除元素,HashMap耗时大概是数组的十倍左右)。

我们花了大量的篇幅分析了EventLoop的构造方法。这里主要是想让大家看到netty对性能的优化真的无处不在!而且是千方百计的去优化!这也是netty被广泛应用的原因。包括好多高性能高吞吐的中间件也使用了netty做通信,比如RocketMQ、Spark。而我们在分析类似netty这种高性能框架的源码时,一定要注意到这些优化细节,这样我们才能清楚这些框架哪里好,才能知道怎么样才能正确的使用这些框架来充分发挥它们的优势!

我们继续看NioEventLoop的主要逻辑,接下来我们看一下run()方法:

    @Override
    protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

run()方法是父类SingleThreadEventExecutor的模板方法的实现。我们可以看到,run()方法就是一个不断的循环,在循环内做了什么操作呢?首先,先调用selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())来获取select策略。我们先来看一下SelectStrategy这个接口:

public interface SelectStrategy {

    /**
     * Indicates a blocking select should follow.
     * 接下来要执行阻塞的select操作
     */
    int SELECT = -1;
    /**
     * IO循环应该被重试,非阻塞select接下来会被直接执行
     * Indicates the IO loop should be retried, no blocking select to follow directly.
     */
    int CONTINUE = -2;
    /**
     * 接下来不要阻塞获取新的事件IO循环
     * Indicates the IO loop to poll for new events without blocking.
     */
    int BUSY_WAIT = -3;

    /**
     * The {@link SelectStrategy} can be used to steer the outcome of a potential select
     * call.
     *
     * @param selectSupplier The supplier with the result of a select result.
     * @param hasTasks true if tasks are waiting to be processed.
     * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
     *         the next step should be to not select but rather jump back to the IO loop and try
     *         again. Any value >= 0 is treated as an indicator that work needs to be done.
     */
    int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}

SelectStrategy提供了三种默认的select策略,即SELECT、CONTINUE、BUSY_WAIT。netty中实现了一个默认的DefaultSelectStrategy,它的计算select策略的方式是:

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }

如果当前EventLoop任务队列中没有任务,就执行SELECT策略,即阻塞的select。如果有的话,就返回当前NioEventLoop中持有的Selector对象的selectNow()方法的返回值,就绪的IO事件的数量。也就是不选择任何select模式。这个过程中其实已经执行了一次非阻塞的selectNow操作

    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

    int selectNow() throws IOException {
        try {
            return selector.selectNow();
        } finally {
            // restore wakeup state if needed
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }
    }

在获取到需要执行的IO select策略后,就选择执行具体的内容,我们可以看到,CONTINUE对应的执行方法就是不执行接下来的逻辑,重新执行select策略的选择。而NIO不支持忙等操作,所以BUSY_WAIT的逻辑和SELECT的逻辑是一致性的,都调用了select(wakenUp.getAndSet(false));方法。这里,我们先要清楚wakenUp这个成员变量的含义,我们先看一下这块内容:

    /**
     * Boolean that controls determines if a blocked Selector.select should
     * break out of its selection process. In our case we use a timeout for
     * the select method and the select method will block for that time unless
     * waken up.
     */
    private final AtomicBoolean wakenUp = new AtomicBoolean();

wekenUp的含义是:控制阻塞的Selector.select在执行的过程中,是否允许被打断。在使用Selector.select的过程中,select方法会被设置超时时间,设置wekenUp为ture时,Selector.select超时后不会继续重新再次被调用。
清楚了wekenUp这个参数的含义后,我们看一下NioEventLoop的具体select操作是什么逻辑:

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            //计算出select阻塞时间的最后截止时间,这个时间计算的方式是当前时间加上提交到当前EventLoop中的最近需要执行的定时任务的延迟时间
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                // 计算出select的阻塞时间,加500000L是为了始终进位。如果整个select操作执行的时间超过了selectDeadLineNanos,整个方法就结束
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                // 如果任务被添加进来,并且任务中想要调用Selector#wakeup方法让Selector提前从阻塞的select方法中返回的话,如果不执行下面操作,就实现不了这个效果,只能等Selector的select方法阻塞timeoutMillis时间后返回。
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

我们来分析一下这段代码的逻辑(在此之前,我们先要清楚,理论上只有当当前EventLoop的任务队列中没有任务的时候才会调用select这个方法SelectStrategy中的逻辑,只有hasTask()是false的时候才返回SELECT,在在调用EventLoop的select方法之前,wakenUp会被设置成false)。首先,计算出select操作最长的阻塞时间timeoutMillis。然后判断hasTasks()的返回值,即EventLoop中是否有添加的任务,如果有的话就说明我们在之前的SelectStrategy选择select策略之后,又有新的任务添加进来了,这个时候为了防止新添加的任务要等到select操作阻塞完成才有机会执行,就做了一个判断:当前的wekenUp如果是false,就设置成ture,然后执行一个非阻塞的selector.selectNow后跳出NioEventLoop.select;否则就继续执行接下来的逻辑。也就是执行Selector.select阻塞操作。selector.selectNow方法结束后会判断,是否有就绪的IO事件,当一下情况满足任意一条就跳出循环结束EventLoop.select方法:有就绪的IO事件、wakenUp在NioEventLoop.select调用之前是true、当前EventLoop有提交的立即执行的任务、当前EventLoop中有提交的定时执行的任务。如果不满足任意情况,就判断是否当前线程有中断状态,有的话也跳出循环。最后判断循环的总时间是否大于设置的Selector.select的超时时间,判断Selector.select是不是因为超时而结束。如果是因为超时而结束就将selectCnt设置为1,继续循环;不是的话就判断循环的次数是否大于SELECTOR_AUTO_REBUILD_THRESHOLD,是的话就跳出循环,这块是为了解决jdk6的关于NIO的bug,我们可以先不用管这个逻辑。到此整个NioEventLoop.select的过程就结束了。这个过程看起来非常乱,我们要弄清楚整个流程,首先要先明白wakenUp这个属性的生命周期。我们可以看到,wakenUp这个属性提供给外部的修改窗口只有一个:

    @Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }

这个方法是protected修饰的,也就是说,这个方法是不提供其他包调用的,所以这个方法是一个netty内部的调用方法,我们可以搜索到这个方法在哪里使用:

wakeup方法被调用处
我们可以看到,这个方法主要是在停机的时候调用的。为的就是在停机的时候将Selector.select从阻塞中唤醒。
细心地朋友也许会发现,NioEventLoop.select方法在调用之前,会把wakenUp设置为false,这是为什么呢?为的就是在外部调用NioEventLoop.wakeup方法的时候wakenUp.compareAndSet(false, true)这个会设置成功,然后可以调用selector.wakeup()将Selector唤醒。

到这里,我们再回过头去看NioEventLoop.select方法,这个方法的作用其实就是:调用Selector.select方法来阻塞地获取就绪的IO事件,并且在任何时候都可以响应weakUp操作,如果NioEventLoop中添加定时任务,NioEventLoop.select会执行的时间最多就是到最近定时任务执行的时间,没有定时任务就最多执行1s。这样去理解是不是简单多了!!!

细心的朋友可能会问:为什么要限制NioEventLoop.select的执行时间最长到下一个定时任务执行的时间呢?我们先带着疑问继续往下看NioEventLoop.run方法。

在结束了select操作之后,继续判断一下wakenUp的标志,如果设置为ture,就调用selector.wakeup();使下一次的selector.select非阻塞。
随后会获取当前的ioRatio,我们之前提过这个参数,这个参数是设置我们的IO操作在整个事件执行中的时间占比的,我们看一下下面的具体逻辑。首先,会判断ioRatio是不是设置100,如果是设置百分之百,就先执行processSelectedKeys(),再执行runAllTasks(),不设置事件占比限制。如果ioRatio不是100,就先执行processSelectedKeys(),并且记录下processSelectedKeys()的执行时间,然后计算出剩余时间,使用这个剩余时间来限制runAllTasks方法。这两个方法就是干什么的呢?这里我先给出答案:processSelectedKeys()的作用是处理所有的就绪的selectedKeys,也就是就绪的IO事件;而runAllTasks这两个重载方法就是执行所有的提交任务。到这里,我们可以明白为什么要限制NioEventLoop.select的执行时间最长到下一个定时任务开始执行的时间了。因为IO处理和任务执行是在一个线程里执行的,如果不限制NioEventLoop.select的执行时间,到达下一个定时任务需要执行的时间的时候,有可能整个线程还阻塞在select方法上!
接下来我们继续分析processSelectedKeys()runAllTasks分别是怎么处理IO事件和提交的任务的。我们先来看一下processSelectedKeys()

    private void processSelectedKeys() {
        //判断是否使用了优化过的Selector,是的话就循环数组,不是的话就循环iterator。
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

首先方法会判断我们是否使用数组来替代Map优化Selector,这个是我们上边分析NioEventLoop的构造方法讲的。我们这里只看优化的方法,其实两个逻辑都是一样的,只是循环的方法不一样。整个执行过程就是遍历就绪的SelectionKey。然后交给processSelectedKey的两个重载方法去处理。这里会根据SelectionKey的attachment对象的类型来判断调用哪个重载方法。我们先不用管这个attachment对象是什么时候被添加的,这个会在我们只会的分析中讲到,我们先来看一下这两个方法的逻辑:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

    private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
        int state = 0;
        try {
            task.channelReady(k.channel(), k);
            state = 1;
        } catch (Exception e) {
            k.cancel();
            invokeChannelUnregistered(task, k, e);
            state = 2;
        } finally {
            switch (state) {
            case 0:
                k.cancel();
                invokeChannelUnregistered(task, k, null);
                break;
            case 1:
                if (!k.isValid()) { // Cancelled by channelReady()
                    invokeChannelUnregistered(task, k, null);
                }
                break;
            }
        }
    }

processSelectedKey(SelectionKey k, AbstractNioChannel ch):我们方法开始的验证逻辑先不看,主要看下面的事件就绪的逻辑。首先,会获取就绪的事件,判断就绪的事件中是否包含连接事件,如果包含,就将当前
SelectionKey的连接就绪事件从SelectionKey的感兴趣的事件中剔除掉,然后将就绪事件交给就绪的AbstractNioChannel的unsafe去处理,调用的是unsafe.finishConnect()方法。具体处理逻辑我们本篇不做分析。然后就是去判断就绪的事件中是否包含了写就绪、读就绪、ACCEPT事件,包含的话都是委托给unsafe的对应方法。
processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task):这个方法很简单,就是执行NioTaskchannelReady方法,如果执行失败了,就执行channelUnregistered方法。我们这里可以猜测NioTask是一个IO就绪事件的回掉方法。
IO就绪事件的处理逻辑很简单,我们接下里看一下提交任务的处理逻辑,我们只看可以设置超时时间的重载方法protected boolean runAllTasks(long timeoutNanos)

    protected boolean runAllTasks(long timeoutNanos) {
        //把定时任务队列里到达执行时间的任务添加到非定时任务队列
        fetchFromScheduledTaskQueue();
        //从非定时任务队列获取任务
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

首先,将到达执行时间的定时任务添加到非定时任务的执行列表中,然后从费定时任务列表中获取任务,没有的话就执行afterRunningAllTasks();,这是一个开放方法,我们这里先不看具体内容。如果有任务,就加入循环中,循环的内容就是:调用safeExecute来执行任务,其实就是在try-cache中执行任务,防止有异常终止。然后已经执行的方法计数加以,判断调用runAllTasks执行的任务个数和0x3F的与是不是0,也就是是不是64的倍数。如果是就检查任务执行的时间有没有超过设置的超时时间,超过了就结束循环,然后调用afterRunningAllTasks();。没有超时的话就继续获取任务。这个逻辑也比较简单。

分析到这里我们就把NioEventLoop.run方法分析完了。run方法的作用用一句话概括就是处理就绪的IO事件和提交的任务。那么问题来了,这个run方法在什么时候被调用呢?我们一路跟着调用链寻找会发现,在NioEventLoop父类SingleThreadEventExecutorexecute(Runnable task)方法被调用的时候就调用了run()方法。当然run()方法是一个重载方法,我们上面分析的是NioEventLoop的实现。
这里我们的NioEventLoop的关键代码分析就基本上结束了。

三、复盘

本篇我们分析了NioEventLoopNioEventLoop除了可以执行提交的任务之外,还可以监听注册的Channel的IO事件,并且可以根据ioRatio来控制两者执行的时间占比。这都是通过它的run()方法来执行的。
那么,NioEventLoop在netty中的定位也显而易见了:真正的任务执行者。在EventLoop的基础上,netty实现了一个抽象类SingleThreadEventLoopSingleThreadEventLoop还继承了SingleThreadEventExecutor,这就使SingleThreadEventLoop具有一个开放性的模板方法:run()方法,我们可以通过run()来实现自己的任务处理逻辑。而NioEventLoop就是通过实现run()方法来定制自己可以同时处理提交的任务和就绪的IO事件的能力。
下篇,我们会分析,netty是怎么将各个组件串联起来的。

上一篇下一篇

猜你喜欢

热点阅读