netty源码分析(三) - NioEventLoop - 2执

2020-10-09  本文已影响0人  进击的蚂蚁zzzliu

概述

执行过程主要有以下4步:

  1. 执行入口逻辑
  2. 轮询IO事件
  3. 处理IO事件
  4. 处理tasks
    下面对这四个步骤进行详细解析

1. 执行入口逻辑

NioEventLoop入口有两个:
A: 服务端启动绑定端口时(netty源码分析(二) - 服务端启动 - 2中有详细分析)
B: 新连接接入通过chooser绑定一个NioEventLoop时(下一章节详细分析)
本节先分析A

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    //channel.eventLoop() -> NioEventLoop extend SingleThreadEventExecutor
    channel.eventLoop().execute(new Runnable() {
        //从execute里面代码可以看出,该runnable被当作task被add到taskQueue中; 在NioEventLoop run方法ranTasks = runAllTasks(0);时执行
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                //执行绑定端口逻辑,上一章节已经分析过
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

private void execute(Runnable task, boolean immediate) {
    //当前时main线程,executor thread = null
    boolean inEventLoop = inEventLoop();
    //提交过来的task并没有执行,只是放在了taskQueue中
    addTask(task);
    if (!inEventLoop) {
        startThread();
    }
}

private void doStartThread() {
    //executor: ThreadPerTaskExecutor  上一节分析创建流程时的创建一个fastThreadLocalThread并启动
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            boolean success = false;
            //更新线程最后执行时间
            updateLastExecutionTime();
            try {
                //真正开始执行NioEventLoop中run方法
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                //忽略非重点逻辑
            }
        }
    });
}

run方法整体代码如下,下面对其中select / processSelectedKeys / runAllTasks进行详细分析

protected void run() {
    //为了解决jdk空轮询bug设置的轮询次数
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                //selectStrategy: 上一节创建流程中创建的calculateStrategy,服务端启动执行时task中有任务(前面1. 执行入口逻辑中add进去的)
                //此时此处为0,不执行select操作;执行完下面的ranTasks = runAllTasks(0);,第二次循环会返回-1,执行select
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    //下一次定时任务触发截至时间,默认不是定时任务,返回 -1L
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if (!hasTasks()) {
                            //2. taskQueue中任务执行完,开始执行select进行阻塞
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                default:
                }
            } catch (IOException e) {
                //IOException时,重构selector(重写打开一个selector,把原selector上的channel重新注册,然后关闭原selector)
                rebuildSelector0();
                selectCnt = 0;
                //防止连续报错导致cpu过载每次IOException后sleep 1s再继续轮询
                handleLoopException(e);
                continue;
            }
            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            //IO事件执行时间占比,默认50
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                final long ioStartTime = System.nanoTime();
                try {
                    //3. 轮询到IO事件,进行处理
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //4. 执行外部任务(非IO任务)
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                //strategy <= 0,只执行外部任务,不处理IO事件
                ranTasks = runAllTasks(0); 
            }
            
            //执行到了task(ranTasks = true)或 轮询到的IO事件的个数strategy > 0,正常情况肯定会走下面if的逻辑
            //每次把selectCnt置0;当出现jkd空轮询bug(即strategy=0)会走unexpectedSelectorWakeup解决该bug
            if (ranTasks || strategy > 0) {
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) {
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

2. 轮询IO事件

private int select(long deadlineNanos) throws IOException {
      if (deadlineNanos == NONE) {
          return selector.select();
      }
      long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
      return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

再来分析下netty是如何解决jdk空轮询bug的

//执行到了task(ranTasks = true)或 轮询到的IO事件的个数strategy > 0,正常情况肯定会走下面if的逻辑
//每次把selectCnt置0;当出现jkd空轮询bug(即strategy=0)会走unexpectedSelectorWakeup解决该bug
if (ranTasks || strategy > 0) {
    selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {
    selectCnt = 0;
}
private boolean unexpectedSelectorWakeup(int selectCnt) {
    //空轮询次数达到阈值默认512次时,重构selector
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        rebuildSelector();
        return true;
    }
    return false;
}

3. 处理IO事件

private void processSelectedKeys() {
    if (selectedKeys != null) {
        //经过优化的方式
        processSelectedKeysOptimized();
    } else {
        //平庸的方式
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

先来分析下selectedKeys创建过程

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        //jdk原生selector,内部selectedKeys是一个set集合
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
    //是否取消优化,默认false
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });
    //原生SelectorImpl类
    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    //netty提供的set,实际上是数组实现,add时间复杂度O(1),效率更高
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                //jdk9以上的版本直接操作对象内存偏移量设置selectedKeySet
                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                }
                //jdk9以下版本通过反射设置selectedKeySet
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } 
        }
    });
    //保存到成员变量selectedKeys
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

下面接着分析processSelectedKeysOptimized

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        //1. 取出IO事件以及对应的channel
        final SelectionKey k = selectedKeys.keys[i];
        //k的引用置null,便于gc回收
        selectedKeys.keys[i] = null;
        //attachment是《netty源码分析(二) - 服务端启动 - 2》中注册selector时放进去的NioServerSocketChannel
        final Object a = k.attachment();
        //2. 处理该channel
        if (a instanceof AbstractNioChannel) {
            //对于boss NioEventLoop,轮询到的基本是连接事件,后续的事情就是通过他的pipeline将连接扔给一个worker NioEventLoop处理
            //对于worker NioEventLoop来说,轮循道的基本商是IO读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        //3. 注销时需要再次轮询
        if (needsToSelectAgain) {
            //清空数组条目(数组元素都置成null)便于gc回收
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

4. 处理tasks

protected boolean runAllTasks(long timeoutNanos) {
    //把定时任务队列(截至时间次序的优先队列PriorityQueue)中可以执行的任务放到taskQueue中
    fetchFromScheduledTaskQueue();
    //取出第一个task
    Runnable task = pollTask();
    if (task == null) {
        //任务队列为空,返回false
        afterRunningAllTasks();
        return false;
    }

    final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    //循环执行taskQueue中所有任务
    for (;;) {
        //task.run()执行任务
        safeExecute(task);
        runTasks ++;
        // 每执行64个任务计算以下超时时间nanoTime()
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    afterRunningAllTasks();
    //更新最后执行时间
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

至此,NioEventLoop主要流程分析完毕

上一篇下一篇

猜你喜欢

热点阅读