程序员

netty源码分析(8)-NioEventLoop执行过程

2019-02-20  本文已影响0人  Jorgezhong

上一节我们研究了NioEventLoop创建过程,概括起来就是,通道channel获取NioEventLoop实例,调用execute()方法,用ThreadPerTaskExecutor执行器,开启一个线程,去执行NioEventLooprun()方法,同时将任务存储到NioEventLoop的成员变量taskQueue队列中,以便后续异步执行,该成员变量在其父类SingleThreadEventExecutor中。

这一节,我们直接研究NioEventLoop具体执行的内容是什么。

入口:启动NioEventLoopSingleThreadEventExecutor#doStartThread()执行了以下代码。实际上时获取了NioEventLoop实例并执行了run()方法。本节主要看该方法

SingleThreadEventExecutor.this.run();

执行过程主要做了三件事

  1. select(wakenUp.getAndSet(false));: 轮询IO事件
  2. processSelectedKeys();: 处理IO事件
  3. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);:执行任务队列里的任务,并根据时间系数ioRatio来设置其运行时间和 processSelectedKeys()运行时间的占比。
    @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        //轮询 注册到该 selector 上的IO事件
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;

                //设置任务队列的时间系数ioRatio
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        //处理IO事件
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                         //处理IO事件
                        processSelectedKeys();
                    } finally {
                        // 以ioTime的执行时间 和 时间系数确定任务队列运行时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

发现runAllTasks()将一系列定时任务(用户可调用schedule()方法来添加定时任务)加入到了taskQueue中,并且执行里面的任务,如果出现了异常,就打印日志,输出到控制台。

    protected boolean runAllTasks() {
        //保证当前线程式NioEventLoop持有的线程
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            //将定时任务队列里的任务全部放到NioEventQueue.taskQueue中
            fetchedAll = fetchFromScheduledTaskQueue();
            //从taskQueue中取出任务依次执行,直到完成,异常则打日志
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
        Runnable task = pollTaskFrom(taskQueue);
        if (task == null) {
            return false;
        }
        for (;;) {
            safeExecute(task);
            task = pollTaskFrom(taskQueue);
            if (task == null) {
                return true;
            }
        }
    }

    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }

runAllTasks(long timeoutNanos)增加了一个超时时间的节点。该节点最终被表示为deadline,超过这个截止时间,循环执行队列任务的操作会被中断,可能队列里任务不能完全执行完。

    protected boolean runAllTasks(long timeoutNanos) {
        //定时任务队列的任务全部挪到NioEventLoop.taskQueue中
        fetchFromScheduledTaskQueue();
        //从taskQueue中取出任务
        Runnable task = pollTask();
        if (task == null) {
            //执行tailQueue中的任务
            afterRunningAllTasks();
            return false;
        }
        //计算截止时间
        //ScheduledFutureTask.nanoTime():定时任务开始执行到现在的时间
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            //执行任务
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                //每64次刷新一次:距离任务开始的时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        //执行tailQueue中的任务
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

因此,ioRatio的作用是以处理IO事件的时长为参考,决定执行taskQueue中的任务的超时时间。如果设置为100的话,则会一直执行,直到队列为空。ioRatio的默认值是50因此是1:1,超时时间和IO事件处理事件是一样的,那么怎么设置这个值呢?当前在处理IO事件,是在bossGroup里面。在这里设置。

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        bossGroup.setIoRatio(100);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

另外以上的一些列ScheduledFutureTask从哪里来呢?
NioEventLoop的父类AbstractScheduledEventExecutor持有一个定时任务队列

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

NioEventLoop通过暴露schedule()方法来允许用户添加定时任务到该任务队列

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(callable, "callable");
        ObjectUtil.checkNotNull(unit, "unit");
        if (delay < 0) {
            delay = 0;
        }
        validateScheduled0(delay, unit);

        return schedule(new ScheduledFutureTask<V>(
                this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }

    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }

    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                    SCHEDULED_FUTURE_TASK_COMPARATOR,
                    // Use same initial capacity as java.util.PriorityQueue
                    11);
        }
        return scheduledTaskQueue;
    }

上一篇 下一篇

猜你喜欢

热点阅读