nettyITnetty

自顶向下深入分析Netty(四)--EventLoop-1

2016-10-13  本文已影响3509人  Hypercube
netty线程模型

我们再次回顾这幅图,通过先前的讲解,现在是不是亲切很多了。图中绿色的acceptor应该是你最熟悉的部分,之前我们在ServerBootstrap中进行了详细分析。我们知道了mainReactor是一个线程池,处理Accept事件负责接受客户端的连接;subReactor也是一个线程池,处理Read(读取客户端通道上的数据)、Write(将数据写入到客户端通道上)等事件。在这一节中,我们将深入分析这两个线程池的实现,不断完善其中的细节。我们首先从类图开始。

4.1 类图

EventLoop类图EventLoop类图

看到这幅类图,如果你的第一印象是气势恢宏,那么恭喜你,你已经成功了一半。但不难预料的是,大多数人和我的感受是一样的:这么多类,一定很累。好在这只是第一印象,我们仔细观察,便会发现其中明显的脉络,两条线索(这里使用自下而上):NioEventLoop以及NioEventLoopGroup即线程和线程池。忽略其中大量的接口,剩余这样的两条线:

NioEventLoop --> SingleThreadEventLoop --> SingleThreadEventExecutor -->
AbstractScheduledEventExecutor --> AbstractScheduledEventExecutor --> 
AbstractEventExecutor --> AbstractExecutorService

NioEventLoopGroup --> MultithreadEventLoopGroup --> 
MultithreadEventExecutorGroup --> AbstractEventExecutorGroup

下面我们正式开始分析,依旧使用自顶向下的方法,从类图顶部向下、从线程池到线程分析。

4.2 EventExecutorGroup

EventExecutorGroup在类图中处于承上启下的位置,其上是Java原生的接口和类,其下是Netty新建的接口和类,由于它处于如此重要的位置,我们详细分析其中的方法。

4.2.1 Executor

首先看其继承自Executor的方法:

    // Executes the given command at some time in the future
    void execute(Runnable command);

只有一个简单的execute()方法,但这个方法奠定了java并发的基础,提供了异步执行任务的。

4.2.2 ExecutorService

ExecutorService的关键方法如下(其中的invoke***方法并非关键,不再列出):

    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

这些方法我们能从命名中便能知道方法的作用。我们主要看submit()方法,该方法是execute()方法的扩展,相较于execute不关心执行结果,submit返回一个异步执行结果Future。这无疑是很大的进步,但这里的Future不提供回调操作,显得很鸡肋,所以Netty将Java原生的java.util.concurrent.Future扩展为io.netty.util.concurrent.Future,我们将在之后进行介绍。

4.2.3 ScheduledExecutorService

从名字可以看出,该接口提供了一系列调度方法:

    ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
                                                long period,TimeUnit unit);
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                long delay,TimeUnit unit);

schedule()方法调度任务使任务在延迟一段时间后执行。scheduleAtFixedRate延迟一段时间后以固定频率执行任务,scheduleWithFixedDelay延迟一段时间后以固定延时执行任务。是不是有点头晕?那就对了,这里有一个例子专门治头晕。专家建议程序员应该每小时工作50分钟,休息10分钟,类似这样:

    13:00 - 13:10 休息
    13:10 - 14:00 写代码
    14:00 - 14:10 休息
    14:10 - 15:00 写代码

实现这样的调度我们可以使用(假设现在时间为13:00):

    executor.scheduleAtFixedRate(new RestRunnable(), 0 , 60, TimeUnit.MINUTES);
    executor.scheduleWithFixedDelay(new RestRunnable(), 0 , 50, TimeUnit.MINUTES);

4.2.4 EventExecutorGroup

    boolean isShuttingDown();
    Future<?> shutdownGracefully();
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    Future<?> terminationFuture();
    EventExecutor next();

EventExecutorGroup扩展的方法有5个,前四个可以从命名中推断出功能。shutdownGracefully()我们已经在Bootstrap一节中使用过,优雅关闭线程池;terminationFuture()返回线程池终止时的异步结果。重点关注next()方法,该方法的功能是从线程池中选择一个线程。EventExecutorGroup还覆盖了一些方法,我们不再列出,如果你感兴趣可以去源码里面查看,需要注意的是,覆盖的方法大部分是将Java原生的java.util.concurrent.Future返回值覆盖为io.netty.util.concurrent.Future。

4.3 线程池

4.3.1 AbstractEventExecutorGroup

AbstractEventExecutorGroup实现了EventExecutorGroup接口的大部分方法,实现都长的和下面的差不多:

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

从这段代码可以看出这个线程池和程序员有一个相同点:懒。当线程池执行一个任务或命令时,步骤是这样的:(1).找一个线程。(2).交给线程执行。

4.3.2 MultithreadEventExecutorGroup

MultithreadEventExecutorGroup实现了线程的创建和线程的选择,其中的字段为:

    // 线程池,数组形式可知为固定线程池
    private final EventExecutor[] children;
    // 线程索引,用于线程选择
    private final AtomicInteger childIndex = new AtomicInteger();
    // 终止的线程个数
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    // 线程池终止时的异步结果
    private final Promise<?> terminationFuture = 
                          new DefaultPromise(GlobalEventExecutor.INSTANCE);
    // 线程选择器
    private final EventExecutorChooser chooser;

MultithreadEventExecutorGroup的构造方法很长,我们将选出其中的关键部分分析,故不列出整体代码。如果你是处女座,这里有一个链接MultithreadEventExecutorGroup
我们先看构造方法签名:

    protected MultithreadEventExecutorGroup(int nThreads, 
                                        ThreadFactory threadFactory, Object... args)

其中的nThreads表示线程池的固定线程数。
MultithreadEventExecutorGroup初始化的步骤是:
(1).设置线程工厂
(2).设置线程选择器
(3).实例化线程
(4).设置线程终止异步结果
首先我们看设备线程工厂的代码:

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
    }
    
    protected ThreadFactory newDefaultThreadFactory(),() {
        return new DefaultThreadFactory(getClass());
    }

如果构造参数threadFactory为空则使用默认线程池,创建默认线程池使用newDefaultThreadFactory(),这是一个protected方法,可以在子类中覆盖实现。
接着我们看设置线程选择器的代码:

    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

如果线程数是2的幂次方使用2的幂次方选择器,否则使用通用选择器。下次如果有面试官问你怎么判断一个整数是2的幂次方,请甩给他这一行代码:

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

Netty实现了两个线程选择器,虽然代码不一致,功能都是一样的:每次选择索引为上一次所选线程索引+1的线程。如果你没看明白代码的含义,没关系,再看一遍。

    private interface EventExecutorChooser {
        EventExecutor next();
    }
    
    private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            return children[childIndex.getAndIncrement() & children.length - 1];
        }
    }
    
    private final class GenericEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            return children[Math.abs(childIndex.getAndIncrement() % children.length)];
        }
    }

最佳实践:线程池数量使用2的幂次方,这样线程池选择线程时使用位操作,能使性能最高。
下面我们接着分析实例化线程的步骤:

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 使用模板方法newChild实例化一个线程
            children[i] = newChild(threadFactory, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 如果不成功,所有已经实例化的线程优雅关闭
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                // 确保已经实例化的线程终止
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

实现的过程一句话描述就是:使用newChild()依次实例化线程,如果出错,关闭所有已经实例化的线程。也许你对finally中的代码有疑问,这是因为不清楚shutdownGracefully()的含义。你需要提前明白这样的事实:shutdownGracefully()只是通知线程池该关闭,但什么时候关闭由线程池决定,所以需要使用e.isTerminated()来判断线程池是否真正关闭。
实例化线程池正常完成后,Netty使用下面的代码设置异步终止结果:

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            // 线程池中的线程每终止一个增加记录数,直到全部终止设置线程池异步终止结果为成功
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

分析完MultithreadEventExecutorGroup的构造方法,我们继续分析普通方法。它的普通方法基本与下面的isTerminated()类似:

    @Override
    public boolean isTerminated() {
        for (EventExecutor l: children) {
            if (!l.isTerminated()) {
                return false;
            }
        }
        return true;
    }

总结起来就是:线程池的状态由其中的各个线程决定。明白了这点,我们使用类比的方法可以推知其他方法的实现,故不再具体分析。

4.3.3 MultithreadEventLoopGroup

MultithreadEventLoopGroup实现了EventLoopGroup接口的方法,EventLoopGroup接口作为Netty并发的关键接口,我们看其中扩展的方法:

    // 将通道channel注册到EventLoopGroup中的一个线程上
    ChannelFuture register(Channel channel);
    // 返回的ChannelFuture为传入的ChannelPromise
    ChannelFuture register(Channel channel, ChannelPromise promise);
    // 覆盖父类接口的方法,返回EventLoop
    @Override EventLoop next();

这些方法在MultithreadEventLoopGroup的具体实现很简单。register()方法选择一个线程,该线程负责具体的register()实现。next()方法使用父类实现,即使用上一节所述的选择器选择一个线程。代码如下:

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

分析完这些代码,我们关注一下线程数的默认设置。

    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", 
                Runtime.getRuntime().availableProcessors() * 2));

默认情况,线程数最小为1,如果配置了系统参数io.netty.eventLoopThreads,设置为该系统参数值,否则设置为核心数的2倍。

4.3.4 NioEventLoopGroup

NioEventLoopGroup的主要代码实现是模板方法newChild(),用来创建线程池中的单个线程,代码如下:

    @Override
    protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) 
                   throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), 
            (RejectedExecutionHandler) args[2]);
    }

关于代码中的参数含义,我们放在NioEventLoop中分析。此外NioEventLoopGroup还提供了setIoRatio()和rebuildSelectors()两个方法,一个用来设置I/O任务和非I/O任务的执行时间比,一个用来重建线程中的selector来规避JDK的epoll 100% CPU Bug。其实现也是依次设置各线程的状态,故不再列出。

上一篇下一篇

猜你喜欢

热点阅读