深入浅出Netty源码剖析Java 杂谈

【第5篇】Netty的NioEventLoopGroup与Ser

2019-05-16  本文已影响3人  爱学习的蹭蹭

1、NioEventLoopGroup

NioEventLoop、EventLoopGroup、NioEventLoopGroup关系图

2、 ChannelFuture与ChannelPromise

ChannelFuture与ChannelPromise关系图

3、SelectorProvider

4、 static关键字

5、EventExecutor

6、Netty中的bossGroup为什么使用线程池的原因大家众所纷纭

the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps, but I don’t see the reason for it.
意思就是说:netty作者说:我们在不同的服务器引导之间共享NioEventLoopGroup,多个boss线程是有用的,但我没有看到它的

7、NioEventLoopGroup源码剖析笔录

   //创建一个实例,使用默认线程数、默认ThreadFactory和SelectorProvider()返回的SelectorProvider.provider()创建一个新实例。
 public NioEventLoopGroup() {
      this(0);
 }
// 创建一个实例,指定线程数、默认ThreadFactory和SelectorProvider()返回的SelectorProvider.provider()创建一个新实例。
 public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
 }

8、MultithreadEventLoopGroup源码剖析笔录

  //默认事件循环线程
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
        //根据可用进程的CPU进行相乘
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        //启动debug就打印log信息
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
  }
  /**
     * 创建一个实例
     * @param nThreads         该实例将使用的线程数
     * @param executor           将要使用的executor, 默认为null
     * @param args  参数将传递给每个newChild(Executor, Object…)调用
     */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
     this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
 }

/**
 * 最终的创建实例构造器
 *
 * @param nThreads          该实例将使用的线程数
 * @param executor          将要使用的executor, 默认为null
 * @param chooserFactory    将要使用的EventExecutorChooserFactory
 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    /** 1.初始化线程池 */
    //参数校验nThread合法性,
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    //executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接口
    // 这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从
    //NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    //这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现线程池;
    //数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。
    children = new EventExecutor[nThreads];

    //for循环实例化children数组,NioEventLoop对象
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            //newChild(executor, args) 函数在NioEventLoopGroup类中实现了, 
            // 实质就是就是存入了一个 NIOEventLoop类实例
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            //如果构造失败, 就清理资源
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }//end foreach

    /** 2.实例化线程工厂执行器选择器: 根据children获取选择器 */
    chooser = chooserFactory.newChooser(children);

    /** 3.为每个EventLoop线程添加 线程终止监听器*/
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    /** 4. 将children 添加到对应的set集合中去重, 表示只可读。*/
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

9、ServerBootstrap类的bind绑定端口的源码详解

//1、从bind点击进去就跟踪到ChannelFuture 的bind方法
 ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080)).sync();

//2、从bind方法的doBind看到处理SocketAddress 
  public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

//3、 doBind方法里面有initAndRegister初始化和注册方法可以跟踪到
private ChannelFuture doBind(final SocketAddress localAddress) {
       //初始化和注册ChannelFuture 
        final ChannelFuture regFuture = initAndRegister();
       // 从ChannelFuture 获取通道
        final Channel channel = regFuture.channel(); 
        if (regFuture.cause() != null) {
            return regFuture;
        }
 if (regFuture.isDone()) {
            // 此时,我们知道注册已经完成并成功。
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {   
        //但只是以防万一,注册的期望(未来)几乎总是已经完成了。
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            //这里是用到监听器进行检测ChannelFuture (管道期望完成状态结果)
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // EventLoop上的注册失败,因此通道承诺失败,直接导致
                        //一旦我们试图访问通道的EventLoop,就会出现IllegalStateException。
                        promise.setFailure(cause);
                    } else {
                        // 注册成功,因此设置要使用的正确执行程序。
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();//调用promise已经注册
                        //处理通道,ChannelFuture,SocketAddress,ChannelPromise
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
}

//4、从第3步doBind方法里面的initAndRegister跟踪到
final ChannelFuture initAndRegister() {
     ChannelFuture regFuture = config().group().register(channel);
}

// `此config()方法是Bootstrap类的config() ,而config() .group()是返回 AbstractBootstrapConfig#EventLoopGroup的对象`
//从 config().group().register(channel);此时跟踪到EventLoopGroup这个类
public final BootstrapConfig config() {
        return config;
}

EmbeddedEventLoop
MultithreadEventLoopGroup
SingleThreadEventLoop
ThreadPerChannelEventLoopGroup
//跟到注册
 @Override
    public ChannelFuture register(Channel channel) {
        //此返回的register是下面的代码的 ChannelFuture register(ChannelPromise promise)
        return register(new DefaultChannelPromise(channel, this));
    }

   @Override
    public ChannelFuture register(ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

*此行代码 promise.channel().unsafe().register(this, promise);跟踪到如图

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 AbstractChannel.this.eventLoop = eventLoop;
 //判断事件循环是否在AbstractEventExecutor事件执行器里面
if (eventLoop.inEventLoop()) {
   //进行注册ChannelPromise(管道期望)
    register0(promise);
} else {
   try {
     //第一次调用事件循环执行eventLoop.execute()
      eventLoop.execute(new Runnable() {
      @Override
        public void run() {
               register0(promise);
             }
        });
   } catch (Throwable t) {
    logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);
      closeForcibly();
      closeFuture.setClosed();
      safeSetFailure(promise, t);
                }
            }
        }
上一篇 下一篇

猜你喜欢

热点阅读