程序猿之路

Netty源码分析(五)EventLoop

2017-05-24  本文已影响95人  三斤牛肉

如果说Netty的核心是它的reactor模式,那么EventLoop就是reactor的核心。通过EventLoop的轮询,netty能够高效的在任务中切换。前面几节都讲的Nio相关类,这里就以NioEventLoop为核心分析下,它是如何执行的。

NioEventLoop的继承关系

image.png

本质上,EventLoop还是一个Executor,既然是Executor,那么就从execute()函数看起

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    
    //判断eventLoop线程是否启动
    boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
    } else {
            startThread();//启动eventLoop线程
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
private void startThread() {
      //判断线程状态
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}

private void doStartThread() {
    ...
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();//当前线程作为整个EventLoop的主线程
            ...
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                ...
            } finally {
                ...
                try {
                    // Run all remaining tasks and shutdown hooks.
                    // 执行剩余的所有任务
                    for (;;) {
                          if (confirmShutdown()) {
                                break;
                          }
                    }
                } finally {
                        try {
                            cleanup();
                        } finally {
                            ...
                        }
                    }
                }
            }
        });
    }

@Override
protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        ...
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
               //轮询完SelectKey后执行task
               //ioRatio用于设置执行task时间,其语义是io执行时间与任务执行时间之比,如果ioRatio是50(默认),则表示任务执行时间是io执行时间的一半
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            ...
        }
    }

整个run函数可以抽象为select-> processSelectedKeys->runTasks->select...

再看select()做了什么:

private void select(boolean oldWakenUp) throws IOException {
  Selector selector = this.selector;
  try {
    int selectCnt = 0;//计数器
    long currentTimeNanos = System.nanoTime();
    
    //最近一个任务的截止时间
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    for (;;) {
      long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
      if (timeoutMillis <= 0) {//如果截止时间小于0.5ms
        if (selectCnt == 0) {//并且一次select都没有执行过,那么执行一次select并中断
          selector.selectNow();
          selectCnt = 1;
        }
        break;
      }
      //如果有新增任务,则执行select并中断
      if (hasTasks() && wakenUp.compareAndSet(false, true)) {
        selector.selectNow();
        selectCnt = 1;
        break;
      }
      

      //阻塞式select,超时时间为最近一个任务截止时间
      int selectedKeys = selector.select(timeoutMillis);
      selectCnt ++;
      //如果有事件上报或有任务/定时任务则退出
      if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
        break;
      }
       ...

      long time = System.nanoTime();
      if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
      // timeoutMillis elapsed without anything selected.
        selectCnt = 1;
      } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
          ...
          //用于解决nio空轮询bug
          //如果select次数超过阈值则重新生成selector
          rebuildSelector();
           selector = this.selector;
           // Select again to populate selectedKeys.
          selector.selectNow();
          selectCnt = 1;
          break;
        }

        currentTimeNanos = time;
      }
      ...
    } catch (CancelledKeyException e) {
      ...
    }
  }

protected long delayNanos(long currentTimeNanos) {
  ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
  if (scheduledTask == null) {
    return SCHEDULE_PURGE_INTERVAL;
  }

  return scheduledTask.delayNanos(currentTimeNanos);
}

processSelectedKey用于具体事件操作

int readyOps = k.readyOps();
//connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);
    unsafe.finishConnect();
}

//写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    ch.unsafe().forceFlush();
}

//读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {
      return;
    }
}

runAllTasks用于执行所有可执行任务,包括定时任务

//这里说明runAllTasks(long timeoutNanos)多了一个截止时间,如果运行超过该时间自动中断
protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        //找到所有定时任务中可以执行的
        fetchedAll = fetchFromScheduledTaskQueue();
        //执行所有任务
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll);
    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}

//把所有可执行的定时任务放到非定时任务的队列中
private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        if (!taskQueue.offer(scheduledTask)) {//如果放入taskQueue失败,则重新放回定时任务队列,返回false中断,等下次循环继续执行
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

总结下
1,EventLoop本身是个线程池,拥有线程池属性。EventLoop可以包含定时任务和非定制任务2种
2,EventLoop本身也是一个循环线程,该线程流程是
1)捕捉是否有io事件上报
2)处理这些事件
3)处理线程池中业务逻辑任务
4)返回第一步

上一篇下一篇

猜你喜欢

热点阅读