工作生活

Netty源码解析

2019-06-29  本文已影响0人  aiwen2017

1 Netty线程资源的创建

下面是Netty官方文档中的一个服务端的例子:

public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

上面的代码主要涉及到NioEventLoopGroup和ServerBootstrap两个类,深入NioEventLoopGroup可以发现另外一个重要的类是NioEventLoop,他们之间的关系图如下:

LoopGroup.png

1.1 NioEventLoopGroup

NioEventLoopGroup是一个处理用户连接或者I/O操作的多线程组,一个NioEventLoopGroup管理和维护多个NioEventLoop并使用一定的策略从其维护的多个NioEventLoop中选取一个对外提供服务。上面的例子中创建了两个NioEventLoopGroup:bossGroup和workerGroup,前者负责处理用户的连接,当连接到来bossGroup会将连接交给workerGroup处理,workerGroup负责连接的I/O操作以及任务执行 。NioEventLoopGroup以及其父类的构造函数创建了如下组件:

  1. SelectorProvider
    SelectorProvider用于创建Selector。
  2. DefaultSelectStrategyFactory
    DefaultSelectStrategyFactory用于获取SelectStrategy,SelectStrategy表示一种如何执行Selector的select操作的策略。
  3. 拒绝策略RejectedExecutionHandler
    默认是一个直接抛出异常的拒绝策略。
  4. 创建ThreadPerTaskExecutor
    用于EventLoop的线程创建。
  5. EventExecutorChooser
    NioEventLoopGroup需要从其维护的多个EventExecutor中选取一个对外提供服务,EventExecutorChooser即表示一种从一组EventExecutor中选择一个EventExecutor的策略。Netty会根据EventExecutor的数量是否是2的n次方决定使用PowerOfTwoEventExecutorChooser还是GenericEventExecutorChooser,这两个策略都是采用轮询的方式从EventExecutor数组中选取一个EventExecutor,只不过前者使用一种更加高效的位与(&)的方式计算轮询。
  6. EventExecutor
    NioEventLoopGroup会创建一定的数量的NioEventLoop对象,EventExecutor是一个接口,其实现类为NioEventLoop,默认创建2倍CPU核心数量的NioEventLoop对象,NioEventLoop的数量对Netty的性能影响很大,因它不仅仅负责通道的I/O操作,还负责执行业务任务。

NioEventLoopGroup的父类MultithreadEventExecutorGroup的构造函数创建了EventExecutor,如下:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

newChild()方法实现在类NioEventLoopGroup中,其直接返回NioEventLoop对象,如下:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

可以看到NioEventLoopGroup的构造函数创建的Executor、SelectorProvider、SelectStrategyFactory以及RejectedExecutionHandler最终都传递给了NioEventLoop。

1.2 NioEventLoop

NioEventLoop可以认为是NioEventLoopGroup中的线程,NioEventLoop有如下三个核心成员变量:

  1. Queue<Runnable> taskQueue
    用于缓存任务的队列,当调用NioEventLoop的execute()方法向NioEventLoop提交任务时,任务首先会会缓存至taskQueu中。NioEventLoop的构造函数为taskQueue赋值了一个高性能多生产者单消费队列MpscUnboundedArrayQueue。可以在用户ChannelHandler中通过ChannelHandlerContext向此队列中添加任务,就像下面这样:
    @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
      ctx.pipeline().channel().eventLoop().execute(() -> {
      // do something
      });
  }
  1. Selector selector
    用于监听通道的Selector,一个NioEventLoop持有一个Selector对象,一个Selector负责监听多个通道上的事件,那么一个NioEventLoop也就负责多个通道的连接和I/O。
  2. 线程
    一个NioEventLoop会有一个运行在的Executor中的线程,此线程在第一次向NioEventLoop提交任务时被激活,Netty中Reactor模型即是依赖此线程实现的,此线程会不断的检测Selector监听的通道是否有事件到达以及执行taskQueue中的任务。线程命名规则为:nioEventLoopGroup-ThreadFactory序号-ThreadFactory创建的线程序号,例如:nioEventLoopGroup-2-1,表示第2个ThreadFactory创建的第一个线程。

NioEventLoop以及其父类的构造函数也是围绕创建以上三个成员变量而展开的,NioEventLoop的构造函数调用openSelector()方法创建了Selector对象,此方法的源码:

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });

    if (!(maybeSelectorImplClass instanceof Class) ||
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
        if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
        }
        return new SelectorTuple(unwrappedSelector);
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    long publicSelectedKeysFieldOffset =
                            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        PlatformDependent.putObject(
                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(
                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                }

                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                }

                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

此方法不仅创建Selector对象,还对其进行了优化,它利用反射把unwrappedSelector对象中的成员变量selectedKeys和publicSelectedKeys用selectedKeySet替换掉,也就是把selectedKeys和publicSelectedKeys的类型从HashSet换成了SelectedSelectionKeySet。SelectedSelectionKeySet是一个实现非常简单的数据结构,它使用一个数组保存SelectionKey,容量不够就2倍数组长度扩容,Netty在使用时直接遍历其数组,较之于HashSet其抛弃了很多不必要的操作,令其效率非常高,其源码如下:

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }

    @Override
    public boolean remove(Object o) { return false; }

    @Override
    public boolean contains(Object o) { return false; }

    @Override
    public int size() { return size; }

    @Override
    public Iterator<SelectionKey> iterator() {
        return new Iterator<SelectionKey>() {
            private int idx;

            @Override
            public boolean hasNext() {
                return idx < size;
            }

            @Override
            public SelectionKey next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                return keys[idx++];
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    void reset() { reset(0); }

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }

    private void increaseCapacity() {
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }
}

2 Netty启动

下面是Netty启动的时序图:

Bootstrap.png

Netty启动大致可以分为NioServerSocketChannel的创建、注册、绑定和激活四个步骤。

2.1 NioServerSocketChannel的创建

从上面时序图可以看到ChannelFactory的newChannel()方法负责创建NioServerSocketChannel对象,ChannelFactory的实现类为ReflectiveChannelFactory,在上面的例子中,通过调用ServerBootstrap的channel()方法设置了NioServerSocketChannel.class,ReflectiveChannelFactory获取NioServerSocketChannel.class的无参构造使用反射创建了NioServerSocketChannel对象。

NioServerSocketChannel的无参构造函数则初始化了如下成员变量:

可以看到NioServerSocketChannel是对Java NIO中ServerSocketChannel的封装,它代表了ServerSocketChannel在Netty中的存在。

2.2 NioServerSocketChannel的注册

NioServerSocketChannel创建完成之后接着是对其的注册操作,从上面的时序图中可以看到,NioEventLoopGroup的register()方法被调用,因为上面的例子中创建了两个NioEventLoopGroup对象,这里的NioEventLoopGroup指的是bossGroup,NioEventLoopGroup使用轮询的策略从其维护的NioEventLoop数组中选取一个NioEventLoop并调用其register方法,NioEventLoop则获取了NioServerSocketChannel持有的AbstractNioMessageChannel#NioMessageUnsafe对象并调用其register()方法,此方法则向NioEventLoop提交了一个任务,因为这里是第一次向NioEventLoop提交任务,所以这里创建并启动了NioEventLoop的线程,任务调用了register0()方法,register0()方调用到了NioServerSocketChannel的父类AbstractNioChannel的doRegister(),此方法源码如下:

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

这里的javaChannel()方法返回的是ServerSocketChannel对象,可以看到这里对ServerSocketChannel对象注册了0事件,表示不接受任何事件,并且保存了返回的SelectionKey。那么有一个疑问是这里为什么注册0呢?一般不都注册SelectionKey.OP_ACCEPT么?仔细观察就会发现这个方法是类AbstractNioChannel中的,AbstractNioChannel是一个抽象类,NioServerSocketChannel和NioSocketChannel都间接继承类此类,那么注册这一操作对ServerSocketChannel和SocketChannel而言注册的事件是不同的,ServerSocketChannel需要组册SelectionKey.OP_ACCEPT事件,而SocketChannel则需要注册SelectionKey.OP_READ事件,所以这里没办法注册具体的事件。这里调用register()方法的目的更多的是为了获取SelectionKey,因为一旦获取了SelectionKey,在后续的操作(doBeginRead()方法)中通过调用这里返回的SelectionKey对象的interestOps()方法注册对应的事件,NioServerSocketChannel注册SelectionKey.OP_ACCEPT事件,NioSocketChannel则注册SelectionKey.OP_READ事件。但是这里好像可以直接注册readInterestOp事件唉,因为NioServerSocketChannel和NioSocketChannel此属性的值分别为SelectionKey.OP_ACCEPT和SelectionKey.OP_READ,此时还没有绑定端口,如果这时绑定事件意义也不大。

可以看到NioServerSocketChannel的注册操作主要是从NioEventLoopGroup(bossGroup)中选取一个NioEventLoop并使用其Selector对ServerSocketChannel注册0事件并获取SelectionKey的过程。

2.3 NioServerSocketChannel的绑定和激活

从上面时序图可以看到,经过NioServerSocketChannel、DefaultChannelPipeline、TailContext以及HeadContext调用最终调用到了AbstractNioMessageChannel#NioMessageUnsafe的bind()方法,此方法实现在其父类AbstractChannel#AbstractUnsafe中,源码如下:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {}

    boolean wasActive = isActive();
    try {
        // 调用子类的doBind()方法绑定端口
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 激活通道
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

doBind()方法实现在NioServerSocketChannel中,如下:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        // 绑定端口
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

可以看到在绑定端口完成后通过异步调用了DefaultChannelPipeline的fireChannelActive()方法,此方法最终通过执行AbstractNioMessageChannel#NioMessageUnsafe的beginRead()方法调用了NioServerSocketChannel的doBeginRead()方法,此方法设置SelectionKey感兴趣事件为SelectionKey.OP_ACCEPT,方法源码如下:

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // 设置SelectionKey感兴趣事件为readInterestOp,
        // 这里readInterestOp即为创建NioServerSocketChannel时赋值的SelectionKey.OP_ACCEPT
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

3 Netty I/O操作

NioEventLoop的线程启动后会运行NioEventLoop的run()方法,此方法是Netty非常核心的部分-Reactor模型的实现。run()方法内是一个死循环,每循环一次就会做如下三件事情:

3.1 selcet策略

下面通过源码看下此方法是如何处理以上三件事情的,如下是run()方法源码:

protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

switch case语句块的逻辑是当队列taskQueue中有任务时调用selectNow()方法,当队列taskQueue中没有任务时调用select(boolean oldWakenUp)方法,而且不论执行了哪个方法当wakenUp为true时都会调用Selector的wakeup()方法唤醒Selector。那么接下来就看下selectNow()方法和select(boolean oldWakenUp)方法都做了些什么,下面是selectNow()方法源码:

int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        // restore wakeup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

方法很简单就是调用了Selector的非阻塞方法selectNow(),也就是说当队列taskQueue中有任务时调用Selector的非阻塞方法selectNow()

如下是select(boolean oldWakenUp)方法源码:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                // 休足了timeoutMillis跳出循环
                break;
            }

            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // 说明休足了timeoutMillis毫秒的时长
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // 如果休眠失败次数selectCnt大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就重建Selector
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

    } catch (CancelledKeyException e) {}
}

此方法本来可以很简单的,其本意也是为了性能而选择调用Selector的select(long timeout)方法阻塞一小会儿,因为此时队列taskQueue中没有任务,线程主要任务是轮询Selector,如果调用Selector的非阻塞方法,则会导致CPU空转时间变长,浪费了CPU,但是可以看到这里稍微有点儿复杂,那是因为这里添加了一些解决JDK的一个有关epoll空轮询的bug的逻辑。

这个bug是这样的,Selector的select()和select(long timeout)方法本来是阻塞的,当有事件到达时才返回,但是在某些平台的环境中会存在一种即使没有事件到达方法也会返回的情况,虽然是一个概率事件,但是这种情况一旦发生就会使程序进入一个死循环,从而使CPU使用率飙升。这个bug的解决思路是一旦识别这种情况就重新创建一个Selector。那么这里就重点关注一下Netty是如何实现这一思路的并解决这一bug的。

首先看下时如何识别这种情况的发生的。默认情况下Netty在1s内select(long timeout)休眠失败的次数大于等于512时就认为进入了JDK的bug,可以看到select休眠后的nanoTime(变量time)减去select休眠前的nanoTime大于等于应当休眠的时长(变量timeoutMillis)则说明休足了timeoutMillis毫秒的时长,把selectCnt置为1;否则就是没有休足时长,selectCnt加1;还有一种情况是如果在休眠的过程中Selector被唤醒了此时也不会休足时长,但是前面做了判断,意思是如果(有事件到达)||(Selector本就应该为唤醒状态)|| (Selector又被唤醒了) ||(有任务了)||(有定时任务了)就跳出循环。

在执行select操作的过程中如果进入JDK bug则select()函数会很快返回,for循环进入快速死循环,selectCnt也随之快速累加,当selectCnt大于等于阈值SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)时就重建Selector并跳出循环;如果没有进入JDK bug,select()函数休足指定时长(1秒)正常跳出for循环。

方法rebuildSelector0()实现了Selector的重建,围观一下其源码:

private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {
        return;
    }

    try {
        // 1. 建一个新的Selector
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    // Register all channels to the new Selector.
    int nChannels = 0;
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }

            int interestOps = key.interestOps();
            // 取消此key
            key.cancel();
            
            // 2. 使用新的Selector重新注册原有通道
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }

    // 3. 新Selector替换旧Selector
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        // 4. 关闭旧的Selector
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }
}

可以看到此方法首先建一个新的Selector,然后使用新的Selector重新注册原有通道,接着替换旧的Selector,最后关闭旧的Selector。

这里稍微总结一下switch case语句块的逻辑:当taskQueue队列有任务时,因为需要执行任务所以采用非阻塞的方式检测是否有事件到达,当taskQueue队列没有任务时采用间歇阻塞的方式检测是否有事件到达,每次阻塞时间为1秒,1秒内如果select休眠失败次数大于等于512则认为进入JDK bug,此时就会重建Selector

3.2 任务执行的时间分配

接下来ioRatio把程序分成了两个分支,其实分支中做的事情是一样的,都是调用processSelectedKeys()方法和runAllTasks()方法,分别用于执行I/O操作和运行taskQueue队列中的任务,区别就在于如何分配执行时间,毕竟这里只有一个线程。从上面run()的源码可以看到,当ioRatio等于100时,按顺序执行了processSelectedKeys()方法和runAllTasks()方法,也就是说先执行I/O操作,完了再执行任务,完全忽略它们执行的时长。当ioRatio(默认50)不等于100时,首先调用processSelectedKeys()方法执行I/O操作并记录其执行时长,然后根据此时长按照ioRatio比率计算执行任务时间片的时长,但是runAllTasks(long timeoutNanos)方法并没有严格按照此时长限制任务的执行,而是一个粗粒度的限制,其源码如下:

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        safeExecute(task);

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

据上面的注释:因为nanoTime()函数相当昂贵所以每执行64个任务检查一次是否到截止时间。一旦到截止时间则不再继续执行taskQueue队列中的任务,任务会在下个for循环中执行。

这里虽然通过ioRate分配了执行任务的时间片,控制了执行任务的时长,但是还有有问题的,问题就出在一旦任务执行时间过长就会阻塞select操作和处理key的操作,此时一般有两种解决方法:

  1. 在添加用户ChannelHandler时添加一个EventExecutorGroup作为线程池用来处理任务,这是一种Netty官方建议的方式。
  2. 在用户ChannelHandler中把任务提交至自定义线程池中,这种方式比较灵活,所以使用中这种方式比较常见。

3.3 I/O处理

方法processSelectedKeys()是I/O操作的入口,如下是其相关源码:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

可以看到processSelectedKeys()方法当selectedKeys不为空时调用了一个优化的处理SelectionKey的函数processSelectedKeysOptimized(),此函数遍历SelectionKey数组处理到达的事件,针对每个SelectionKey调用processSelectedKey()方法,processSelectedKey()方法则根据不同事件做不同处理,processSelectedKey()方法方法源码如下:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

可以看到这里NioUnsafe接口发挥了核心作用,不管哪个事件都会调用到此接口的方法,SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件处理的时序图如下:

Read-Write.png

3.4 SelectionKey.OP_ACCEPT事件处理

当事件为SelectionKey.OP_ACCEPT时,此时的NioEventLoop为bossGroup中的NioEventLoop,这里unsafe指向NioServerSocketChannel中关联的AbstractNioMessageChannel#NioMessageUnsafe对象。
AbstractNioMessageChannel#NioMessageUnsafe的read()方法通过调用NioServerSocketChannel的doReadMessages()方法获取了SocketChannel对象并创建了NioSocketChannel对象,相关方法源码如下:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // 获取SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // 创建NioSocketChannel对象
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
// SocketUtils.accept()方法
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    // 获取SocketChannel
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

NioSocketChannel的属性配置以及初始化和NioServerSocketChannel类似:

bossGroup的NioEventLoop的DefaultChannelPipeline的ChannelHandler链是HeadContext(HeadContext)-DefaultChannelHandlerContext(ServerBootstrapAcceptor)-TailContext(TailContext)

从上面时序图可以看到在DefaultChannelPipeline的调用链中调用到了ServerBootstrapAcceptor的channelRead()方法,此方法源码如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    
    // 用户ChannelHandler添加至通道的DefaultPipeline中
    child.pipeline().addLast(childHandler);

    // 设置通道参数和属性
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 通道注册到workerGroup(NioEventLoopGroup)中
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

可以看到此方法首先把用户childHandler(例子中第4步设置的匿名ChannelHandler)添加至通道的DefaultPipeline中,接着设置通道参数和属性,最后把通道注册给了workerGroup(NioEventLoopGroup)中

此方法是非常核心的一个方法,它衔接了bossGroup和workerGroup,完成了把通道由bossGroup的NioEventLoop转交给workerGroup的使命。

接下来的流程和NioServerSocketChannel的类似,workerGroup从其持有的NioEventLoop数组中选择一个NioEventLoop处理通道,给通道注册0事件并获取SelectionKey;而后childHandler的initChannel()方法被执行,用户ChannelHandler(对应上面例子中的EchoServerHandler)被添加至通道的DefaultPipeline中,同时childHandler被从通道的DefaultPipeline中移除;随后用户ChannelHandler如果实现了channelActive()则channelActive()方法被调用,表示通道已经建立;最后SelectionKey被设置为感兴趣SelectionKey.OP_READ事件。

3.5 SelectionKey.OP_READ事件处理以及连接关闭

当事件为SelectionKey.OP_READT时,NioSocketChannelUnsafe的read()方法被调用,此方法源码如下:

@Override
public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 1. 创建ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            
            // 2. 读数据到ByteBuf
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // nothing was read. release the buffer.
                byteBuf.release();
                byteBuf = null;
                // 3. 检查对方是否关闭了连接
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            // 4. ByteBuf交给管道处理读事件,会调用到用户ChannelHandler的channelRead()方法
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        // 5. 关闭连接
        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

可以看到此方法是读操作的核心实现,其首先创建了ByteBuf,接着把数据读到ByteBuf中,并检查连接是否关闭了,数据读完后调用用户ChannelHandler的channelRead()方法,如果对方关闭了连接则调用closeOnRead()方法关闭连接。

具体读数据读操作在NioSocketChannel的doReadBytes()方法中,此方法源码如下:

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

可以看到这里又调用了ByteBuf的writeBytes()方法,此处ByteBuf的实现类为PooledUnsafeDirectByteBuf,writeBytes方法在其父类AbstractByteBuf中实现,writeBytes()方法又调用了setBytes()方法,相关源码如下:

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}
    
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    checkIndex(index, length);
    ByteBuffer tmpBuf = internalNioBuffer();
    index = idx(index);
    tmpBuf.clear().position(index).limit(index + length);
    try {
        return in.read(tmpBuf);
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}

可以看到在setBytes()中调用了SocketChannel的read()方法,把字节读到ByteBuffer中去。

Channel以及NioUnsafe相关接口类图如下:

Channel-Unsafe.png

3.5 Netty写入和刷新

下面是Netty写和刷新的调用时序图:


Write-Flush.png

可以看到DefaultChannelHandlerContext的write()方法最终调用了ChannelOutboundBuffer的addMessage()方法,此方法源码如下:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

可以看到这里只是把消息以一种Entry的形式连接在一起从而形成一个链表。

flush()方法首先调用ChannelOutboundBuffer的addFlush()把之前添加的消息标记为flushed,如下:

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

接着调用NioSocketChannel的doWrite()方法,此方法调用nioBuffers()方法把之前添加的消息转换为ByteBuffer[],最后将这些ByteBuffer[]写入SocketChannel,相关源码如下:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    int writeSpinCount = config().getWriteSpinCount();
    do {
        if (in.isEmpty()) {
            // All written so clear OP_WRITE
            clearOpWrite();
            return;
        }

        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        
        // 1. 把之前添加的消息转换为ByteBuffer[]
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        int nioBufferCnt = in.nioBufferCount();

        switch (nioBufferCnt) {
            case 0:
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                long attemptedBytes = in.nioBufferSize();
                // 2. ByteBuffer[]写入SocketChannel
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);

    incompleteWrite(writeSpinCount < 0);
}

4 对象缓存池-Recycler

为了性能,Netty对频繁使用的对象进行了缓存,当一个对象经历创建、使用后并没有直接销毁,而是缓存起来,下次使用时直接从缓存中获取,例如ByteBuf对象就是其缓存的重点。

Netty中缓存模块主要由类Recycler、Stack、DefaultHandle、WeakOrderQueue实现,这些类的关系图如下:

Recycler-class.png

缓存模块中一个比较重要的操作是对象的回收,其实现逻辑为:如果对象的回收和对象的获取发生在同一个线程内,则对象回收时直接把对象放回线程所关联的Stack对象的DefaultHandle[]数组(默认长度256)中;如果不在同一个线程内则把对象放回Stack对象的WeakOrderQueue链表中的某个WeakOrderQueue中,这样就避免了取对象和回收对象时的线程安全问题。

类Recycler此类有重要的属性和方法:

当调用Recycler的get()方法从缓存中获取一个对象时,get()方法首先会从属性threadLocal中获取一个和线程相关的Stack对象,然后调用其pop()方法回去对象,如果pop()返回null则调用newObject()方法创建一个对象。

Stack的pop()方法首先会判断其DefaultHandle数组中是否有对象,如果有就返回一个,如果没有则从其持有的WeakOrderQueue链表中拉去一批并放入数组中。

当对象使用完需要回收时,Stack的push()方法被调用,此方法首先判断其持有的线程引用是否等于当前线程,据此走了两个方法:pushNow()和pushLater(),pushNow()方法把对象放回DefaultHandle[],而pushLater()则把对象放回WeakOrderQueue链表中的某个WeakOrderQueue中,如下为相关源码:

void push(DefaultHandle<?> item) {
    Thread currentThread = Thread.currentThread();
    if (threadRef.get() == currentThread) {
        pushNow(item);
    } else {
        pushLater(item, currentThread);
    }
}

private void pushNow(DefaultHandle<?> item) {
    if ((item.recycleId | item.lastRecycledId) != 0) {
        throw new IllegalStateException("recycled already");
    }
    item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

    int size = this.size;
    if (size >= maxCapacity || dropHandle(item)) {
        return;
    }
    if (size == elements.length) {
        elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
    }

    elements[size] = item;
    this.size = size + 1;
}

private void pushLater(DefaultHandle<?> item, Thread thread) {
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    WeakOrderQueue queue = delayedRecycled.get(this);
    if (queue == null) {
        if (delayedRecycled.size() >= maxDelayedQueues) {
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
        if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
            // drop object
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        // drop object
        return;
    }

    queue.add(item);
}

例如某一时刻某一线程创建了A、B、C三个对象,最后这个三个对象分别在Thread1、Thread2、Thread3被回收,那么其逻辑图如下:


Recycler.png

从存储数据的结构可以看出,这种设计特别适合一个线程创建对象然后这些对象在不同的线程中回收的场景,这样一来就极大地避免的线程冲突的情况。Netty中NioEventLoop的线程就是这种场景,Netty中NioEventLoop的线程会一直运行并处理I/O事件,期间会多次使用到PooledUnsafeDirectByteBuf对象,类PooledUnsafeDirectByteBuf的静态对象newInstance()负责创建此对象,下面就是其使用Recycler的代码:

final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
    private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        @Override
        protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
            return new PooledUnsafeDirectByteBuf(handle, 0);
        }
    };

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }
    
    // 略
}
上一篇下一篇

猜你喜欢

热点阅读