Netty源码分析(二) NioEventLoop

2018-11-07  本文已影响0人  skyguard

下面我们来分析一下Netty里的又一个关键组件NioEventLoop。
其实NioEventLoop就是轮询注册在Selector上的相关事件,在AbstractBootstrap里有一个属性是

 /**
 * EventLoopGroup 对象
*/
 volatile EventLoopGroup group;

在ServerBootStrap里有一个属性是

  /**
 * 子 Channel 的 EventLoopGroup 对象
 */
private volatile EventLoopGroup childGroup;

NioEventLoopGroup就是线程组,包含了NioEventLoop。
默认是cpu个数的2倍

 /**
 * 默认 EventLoop 线程数
 */
 private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

下面就是NioEventLoop了,先来看看NioEventLoop有哪些东西,NioEventLoop包含了一个Selector和一个TaskQueue,具体结构为


NioEventLoop结构图

来看下NioEventLoop都的类结构图


NioEventLoop

看起来好像很复杂的样子,其实最主要的就是SingleThreadEventExecutor和NioEventLoop这2个类,先来看下SingleThreadEventExecutor的runAllTasks方法

 protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false; // 是否执行过任务

    do {
        // 从定时任务获得到时间的任务
        fetchedAll = fetchFromScheduledTaskQueue();
        // 执行任务队列中的所有任务
        if (runAllTasksFrom(taskQueue)) {
            // 若有任务执行,则标记为 true
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    // 如果执行过任务,则设置最后执行时间
    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }

    // 执行所有任务完成的后续方法
    afterRunningAllTasks();
    return ranAtLeastOne;
}

你可能会问,这只是处理非io任务的,那selector是怎么处理事件的呢。别急,这就要看NioEventLoop这个类了。在NioEventLoop的processSelectedKeys方法处理了io事件,包括connect,accept,read,write,具体为processSelectedKey方法

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 如果 SelectionKey 是不合法的,则关闭 Channel
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop != this) {
            return;
        }
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        // 获得就绪的 IO 事件的 ops
        int readyOps = k.readyOps();

        // OP_CONNECT 事件就绪
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // 移除对 OP_CONNECT 感兴趣
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            // 完成连接
            unsafe.finishConnect();
        }

        // OP_WRITE 事件就绪
        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            // 向 Channel 写入数据
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

NioEventLoop还有一个定时任务,为AbstractScheduledEventExecutor,这个类有一个定时任务队列,负责执行定时任务

 /**
 * 定时任务队列
 */
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

最后会执行schedule方法

 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    ObjectUtil.checkNotNull(callable, "callable");
    ObjectUtil.checkNotNull(unit, "unit");
    if (delay < 0) {
        delay = 0;
    }
   

    return schedule(new ScheduledFutureTask<V>(
            this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

然后就是NioEventLoop执行run方法,开始轮询各种io事件了。NioEventLoop的分析就到这里了。

上一篇下一篇

猜你喜欢

热点阅读