Netty系列(一):NioEventLoopGroup源码解析

2019-01-17  本文已影响0人  marsjhe

前言

对于NioEventLoopGroup这个对象,在我的理解里面它就和ThreadGroup类似,NioEventLoopGroup中有一堆NioEventLoop小弟,ThreadGroup中有一堆Thread小弟,真正意义上干活的都是NioEventLoopThread这两个小弟。下面的文章大家可以类比下进行阅读,应该会很容易弄懂的。

NioEventLoopGroup

这里咱们可以从NioEventLoopGroup最简单的无参构造函数开始。

    public NioEventLoopGroup() {
        this(0);
    }

一步步往下走,可以发现最终调用到构造函数:

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

参数说明:

  1. nThreads:在整个方法链的调用过程中,其值到这里为止一直为0,在没有主动配置的情况下后面会进行设置。若配置io.netty.eventLoopThreads系统环境变量,则优先考虑,否则设置成为CPU核心数*2
  2. executor: 到目前为止是null
  3. selectorProvider: 这里为JDK的默认实现SelectorProvider.provider()
  4. selectStrategyFactory:这里的值是DefaultSelectStrategyFactory的一个实例SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory()
  5. RejectedExecutionHandlers:这里是个拒绝策略,这里默认的实现是队列溢出时抛出RejectedExecutionException异常。

MultithreadEventLoopGroup

继续往下面走,调用父类MultithreadEventLoopGroup中的构造函数:

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

这里可以看到判断nThreads == 0后就会给其附上一个默认值。继续走,调用父类MultithreadEventExecutorGroup中的构造方法。

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
DefaultEventExecutorChooserFactory

这里有个关注的点,DefaultEventExecutorChooserFactory。这是一个chooserFactory,用来生产EventExecutorChooser选择器的。而EventExecutorChooser的功能是用来选择哪个EventExecutor去执行咱们的任务。咱们从下面的代码中可以观察到DefaultEventExecutorChooserFactory一共给咱们提供了两种策略。

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

这里的策略也很简单:

  1. 如果给的线程数是2^n个,那么选择PowerOfTwoEventExecutorChooser这个选择器,因为这样可以采用位运算去获取执行任务的EventExecutor
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
  1. GenericEventExecutorChooser选择器,这里采用的是取模的方式去获取执行任务的EventExecutor
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }

相比而言,位运算的效率要比取模的效率高,所以咱们在自定义线程数的时候,最好设置成为2^n个线程数。


干正事

到达最终调用的函数

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        //创建NioEventLoop失败后进行资源的一些释放
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
       //这里可以去看下上面对于 DefaultEventExecutorChooserFactory的一些介绍
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            // 给每一个成功创建的EventExecutor 绑定一个监听终止事件
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        // 弄一个只读的EventExecutor数组,方便后面快速迭代,不会抛出并发修改异常
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

从上面的代码可以观察到,等了很久的executor 在这里终于给其赋值了,其值为ThreadPerTaskExecutor的一个实例对象,这一块的初始化赋值都是很简单的,干活调用的是如下方法:

    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }

对这一块不是很了解的可以去查阅下线程池有关的资料,咱们重点关注一下newChild这个方法,可以说是上面整个流程中的重点:

newChild

newChild这个方法在NioEventLoopGroup中被重写了:

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

细心的小伙伴可以观察到,这里有用到SelectorProvider,SelectStrategyFactory以及RejectedExecutionHandler这个三个参数,实际上就是本文最开始初始化的三个实例对象(可以翻阅到顶部查看一下)。
继续往下走流程:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

在上面的代码片段中除了调用父类的构造器之外就进行了参数的判空和简单的赋值。这里openSelector方法调用后返回SelectorTuple实例主要是为了能同时得到包装前后的selectorunwrappedSelector

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

这里会有一个taskQueue队列的初始化(Queue<Runnable> taskQueue),看名字就知道,这个队列里面放着的是咱们要去执行的任务。这里的初始化方法newTaskQueueNioEventLoop中重写了的。具体如下:

    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }

这里生成的是一个MPSC队列(Multi Producer Single Consumer),这是一个多生产者单消费的无锁队列,支持并发。从字面意思上就可以观察到这个队列效率应该是蛮高的。这里的maxPendingTasks值为Integer.MAX_VALUE。然后最终生成的是MpscUnboundedArrayQueue这样一个无边界的队列。

这样newChild这个方法到这里就走完了。


terminationListener

简单介绍下这个环节,在上面的创建NioEventLoopGroup有个环节是给每个NioEventLoop儿子绑定一个terminationListener监听事件

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

这个事件的回调方法是:

            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }

在每一个NioEventLoop关闭后,就会回调这个方法,然后给NioEventLoopGroup实例中的terminatedChildren字段自增1,并与初始化成功的NioEventLoop的总个数进行比较,如果
terminatedChildren的值与NioEventLoop的总个数相等,则调用bossGroup.terminationFuture().get()方法就不会阻塞,并正常返回null
同样,future.channel().closeFuture().sync()这段代码也将不会阻塞住了,调用sync.get()也会返回null
下面给一段测试代码,完整示例大家可以到我的github中去获取:

terminationListener_test

上面的代码只是一个简单的测试,后面还有别的发现的话会继续在github中与大家一起分享~


End

上一篇 下一篇

猜你喜欢

热点阅读