NioEventLoopGroup

2021-04-21  本文已影响0人  学学学q

NioEventLoopGroup是一个可处理I/O操作的多线程事件循环。Netty提供了多种EventLoopGroup的实现用于不同类型的传输。
在之前的例子中,实现了简单的客户端应用,并且使用了两个NioEventLoopGroup,通常第一个叫做boss接受收到的连接请求。第二个通常叫做worker,只要boss接受了连接请求并将注册连接请求到worker,它就会处理接收连接的传输。

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

如果不指定线程数量,默认值是0

   public NioEventLoopGroup() {
        this(0);
    }

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

当默认为0的时候,会创建多少线程呢?

  protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  }

可以看到DEFAULT_EVENT_LOOP_THREADS,开辟的线程数量是默认的cpu处理器数量 x 2

   DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

生成处理任务的多线程执行器

  if (executor == null) {
      executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  }

然后查看ThreadPerTaskExecutor

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

可以看到使涉及到了两个类ExecutorThreadFactory,ThreadPerTaskExecutor用于在一个新的线程,非调用者线程去执行任务。

/**
An object that creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread, enabling applications to use special thread subclasses, priorities, etc.
The simplest implementation of this interface is just:
  
 class SimpleThreadFactory implements ThreadFactory {
   public Thread newThread(Runnable r) {
     return new Thread(r);
   }
 }
The Executors.defaultThreadFactory method provides a more useful simple implementation, that sets the created thread context to known values before returning it.
*/
public interface ThreadFactory {
    Thread newThread(Runnable r);
}

这里DefaultThreadFactoryThreadFactory的实现

protected ThreadFactory newDefaultThreadFactory() {
       return new DefaultThreadFactory(getClass());
}

接下来的方法,children = new EventExecutor[nThreads];

private final EventExecutor[] children;

EventExecutorEventExecutorGourp的描述和继承关系如下

/**
The EventExecutor is a special EventExecutorGroup 
which comes with some handy methods to see if a Thread is executed in a event loop. Besides this, it also extends the EventExecutorGroup to allow for a generic way to access methods.
*/
public interface EventExecutor extends EventExecutorGroup {
}

/**
 * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use
 * via its {@link #next()} method. Besides this, it is also responsible for handling their
 * life-cycle and allows shutting them down in a global fashion.
 *
 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
}

EventExecutorGroup负责提供EventExecutor通过使用它的next()方法。除此之外,它还负责处理它们的生命周期,并允许在全局范围内关闭它们.
接下来,根据线程数循环添加NioEventLoop

  for (int i = 0; i < nThreads; i ++) {
      boolean success = false;
      children[i] = newChild(executor, args);
}

而方法newChild()方法的实现如下:

  @Override
  protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
  }
/*
    private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
*/

for循环执行结束之后,再看children这个对象

children里的元素
接下来代码会执行到
chooser = chooserFactory.newChooser(children);

会进入EventExecutorChooserFactory的实现类DefaultEventExecutorChooserFactory执行此方法

   @Override
   public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
   }

  private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

返回一个实现了EventExecutorChooser的类PowerOfTwoEventExecutorChooser,其next()方法中,使用轮询的方式选取下一个EventExecutor

Listens to the result of a Future. The result of the asynchronous operation is notified once this listener is added by calling Future.addListener(GenericFutureListener).

最后,添加监听器,以异步处理回调结果

   final FutureListener<Object> terminationListener = new FutureListener<Object>() {
           @Override
           public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
           }
       }
   };

   for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
   }

将children内的所有元素添加到一个readonlyChildren

  /**
   *  private final Set<EventExecutor> readonlyChildren;
   */
  Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
  Collections.addAll(childrenSet, children);
  readonlyChildren = Collections.unmodifiableSet(childrenSet);
上一篇 下一篇

猜你喜欢

热点阅读