netty笔记-NioEventLoopGroup

2018-07-05  本文已影响260人  兴浩

1.EventExecutorGroup

从命名来看,其是EventExecutor的一个容器,next方法就是选择一个EventExecutor

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    EventExecutor next();
}

2.EventExecutor

同时EventExecutor也继承了EventExecutorGroup,这点有些不可理解,既然是一个group,那么其next方法如何实现?

3.next方法实现

3.1 MultithreadEventExecutorGroup

MultithreadEventExecutorGroup实现了next方法,其使用了一个EventExecutorChooser选择器来选择EventExecutor

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    @Override
    public EventExecutor next() {
        return chooser.next();
    }
}

3.2 AbstractEventExecutor

AbstractEventExecutor直接返回的是this,所以其是一个单容器实现

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    @Override
    public EventExecutor next() {
        return this;
    }

3.3 AbstractEventExecutorGroup

基本可以认为是EventExecutor的代理执行,代码基本类似

public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
}

4.EventExecutorChooser

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

线程池数量使用2的幂次方,这样线程池选择线程时使用位操作,能使性能最高。

5.NioEventLoop的初始化

在MultithreadEventExecutorGroup的构造函数中,通过newChild方法初始化

NioEventLoopGroup重写了newChild方法,返回的是NioEventLoop对象

   protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }


public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
}

最终的初始化结果:

以上分析,基本可以看到EventExecutorGroup的核心在于选择EventExecutor,具体的执行方法都在EventExecutor中实现

参考:
自顶向下深入分析Netty(四)--EventLoop-1

上一篇下一篇

猜你喜欢

热点阅读