javaJava 杂谈程序员

第十六节 netty源码分析之 server端的源码分析

2019-01-30  本文已影响3人  勃列日涅夫

netty server端

以netty官方EchoServer服务器端的启动代码分析:

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);//用于处理客户端的连接请求
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用于处理与各个客户端连接的 IO 操作
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
                    //负责处理客户端的连接请求
             .handler(new LoggingHandler(LogLevel.INFO))
                    //负责和客户端的连接的 IO 交互
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

和客户端的代码相比, 没有很大的差别, 基本上也是进行了如下几个部分的初始化:

    EventLoopGroup: 不论是服务器端还是客户端, 都必须指定 EventLoopGroup. 在这个例子中, 指定了 NioEventLoopGroup, 表示一个 NIO 的EventLoopGroup, 不过服务器端需要指定两个 EventLoopGroup, 一个是 bossGroup, 用于处理客户端的连接请求; 另一个是 workerGroup, 用于处理与各个客户端连接的 IO 操作.

    ChannelType: 指定 Channel 的类型. 因为是服务器端, 因此使用了 NioServerSocketChannel.

    Handler: 设置数据的处理器.
  1. Channel 的初始化

group.channel(NioServerSocketChannel.class)
根据源码以及在分析客户端源码很容易看出来服务端channel的初始化

 public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

这是会将ServerBootstrap属性ChannelFactory初始化ReflectiveChannelFactory且clazz为NioServerSocketChannel
至于类型也即NioServerSocketChannel
NioServerSocketChannel的类图如下:


图片.png

unsafe 字段其实是一个 AbstractNioMessageChannel#AbstractNioUnsafe 的实例.
我们来总结一下, 在 NioServerSocketChannsl 实例化过程中, 所需要做的工作:

调用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO ServerSocketChannel

AbstractChannel(Channel parent) 中初始化 AbstractChannel 的属性:

    parent 属性置为 null

    unsafe 通过newUnsafe() 实例化一个 unsafe 对象, 它的类型是 AbstractNioMessageChannel#AbstractNioUnsafe 内部类

    pipeline 是 new DefaultChannelPipeline(this) 新创建的实例.

AbstractNioChannel 中的属性:

    SelectableChannel ch 被设置为 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.

    readInterestOp 被设置为 SelectionKey.OP_ACCEPT

    SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)

NioServerSocketChannel 中的属性:

    ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())
  1. ChannelPipeline 初始化和客户端一致,在创建channel时会创建pipeline

  2. bossGroup 与 workerGroup

这里的bossGroup和workerGroup。就是前面介绍nio reactor模式的多线程版本 可参考


图片.png

bossGroup 不断地监听是否有客户端的连接, 当发现有一个新的客户端连接到来时, bossGroup 就会为此连接初始化各项资源, 然后从 workerGroup 中选出一个 EventLoop 绑定到此客户端连接中. 那么接下来的服务器与客户端的交互过程就全部在此分配的 EventLoop 中

源码:
1、 初始化EventLoopGroup

/**
     * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
     * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
     * {@link Channel}'s.
     */
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    //和客户端的相同
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

2、 group是如何和channel关联

AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister
源码:

//  实例化channel和 channel 的注册过程
  final ChannelFuture initAndRegister() {
      Channel channel = null;
      try {
          channel = channelFactory.newChannel();
          init(channel);
      } catch (Throwable t) {
          if (channel != null) {
              // channel can be null if newChannel crashed (eg SocketException("too many open files"))
              channel.unsafe().closeForcibly();
              // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
              return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
          }
          // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
          return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
      }
//1、config()方法为子类Bootstrap实现返回BootstrapConfig对象。
// 2、BootstrapConfig中group()其实调用的为其父类 AbstractBootstrapConfig中的group()方法,该方法中返回bootstrap.group()即bootstrap中我们添加的group
//3、也就是最终调用NioEventLoopGroup的父类MultithreadEventExecutorGroup的register方法。该方法返回 next().register(channel);
// 4、 next()方法MultithreadEventExecutorGroup的父类MultithreadEventExecutorGroup实现的。该方法返回  chooser.next(); 这里的
//这里的chooser是DefaultEventExecutorChooserFactory由方法chooserFactory.newChooser(children)返回;可参考(NioEventLoopGroup的父类MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup类的构造器this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);)
//我们还记得最初NioEventLoopGroup的构造器最终会调用MultithreadEventExecutorGroup的构造器MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)
// 在构造器中有一个children[i] = newChild(executor, args);方法格外引起我们的注意。因为这里的前面的next方法返回children数组中的值,newChild方法的实现类在NioEventLoopGroup中
//它返回return new NioEventLoop(this, executor, (SelectorProvider) args[0],
//            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);最后NioEventLoop的父类SingleThreadEventLoop 父类中的注册方法register,该方法调用到第四步
// 5、promise.channel().unsafe().register(this, promise);获取 channel 的 unsafe() 底层操作对象, 然后调用它的 register.
//6、在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel。AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法
//7、AbstractNioChannel.doRegister 方法通过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.
      ChannelFuture regFuture = config().group().register(channel);
      if (regFuture.cause() != null) {
          if (channel.isRegistered()) {
              channel.close();
          } else {
              channel.unsafe().closeForcibly();
          }
      }

      // If we are here and the promise is not failed, it's one of the following cases:
      // 1) If we attempted registration from the event loop, the registration has been completed at this point.
      //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
      // 2) If we attempted registration from the other thread, the registration request has been successfully
      //    added to the event loop's task queue for later execution.
      //    i.e. It's safe to attempt bind() or connect() now:
      //         because bind() or connect() will be executed *after* the scheduled registration task is executed
      //         because register(), bind(), and connect() are all bound to the same thread.

      return regFuture;
  }
}

这里 group() 方法返回的是上面 bossGroup, 至于channel 是一个 NioServerSocketChannsl 实例, 因此我们可以知道, group().register(channel) 将 bossGroup 和 NioServerSocketChannsl 关联.
剩下的workerGroup 是在哪里与 NioSocketChannel 关联的呢?
我们继续看 init(channel) 方法,在子类ServerBootstrap中实现:

//这里初始化的为nioserverchannel
    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
//currentChildGroup、currentChildHandler客户端的连接的 IO 交互
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

上面代码中init 方法在 ServerBootstrap 中重写了, 从上面的代码片段中我们看到, 它为 pipeline 中添加了一个 ChannelInitializer, 而这个 ChannelInitializer 中添加了一个关键的 ServerBootstrapAcceptor handler.
关于 handler 的添加与初始化的过程, 我们留待下一小节中分析, 我们现在关注一下 ServerBootstrapAcceptor 类.ServerBootstrapAcceptor 中重写了 channelRead 方法, 其主要代码

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    ...
    childGroup.register(child).addListener(...);
}

ServerBootstrapAcceptor 中的 childGroup 是构造此对象是传入的 currentChildGroup, 即我们的 workerGroup,
而 Channel 是一个 NioSocketChannel 的实例, 因此这里的 childGroup.register 就是将 workerGroup 中的摸个 EventLoop 和 NioSocketChannel 关联了.
, ServerBootstrapAcceptor.channelRead 方法是怎么被调用的呢? 其实当一个 client 连接到 server 时,Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();
    ... 省略异常处理
    buf.add(new NioSocketChannel(this, ch));
    return 1;
}

在 doReadMessages 中, 通过 javaChannel().accept() 获取到客户端新连接的 SocketChannel, 接着就实例化一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .
接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦

简单用一个图展示下:

图片.png

handler 的添加
服务器端的 handler 的添加过程和客户端的有点区别
EventLoopGroup 一样, 服务器端的 handler 也有两个, 一个是通过 handler() 方法设置 handler 字段, 另一个是通过 childHandler() 设置 childHandler 字段
分析上面ServerBootstrap 重写了 init 方法
pipeline中由

head<->ChannelInitializer<->tail
在客户端分析ChannelInitializer,当channel绑定到eventLoop后(在这里是 NioServerSocketChannel 绑定到 bossGroup)中时, 会在pipeline中发出fireChannelRegistered 事件, 接着就会触发 ChannelInitializer.initChannel 方法的调用.
然后变成
head<->LoggingHandler<->ServerBootstrapAcceptor<->tail

上面是server的handler,那么childHandler 是在哪里添加的呢?还记得ServerBootstrapAcceptor这个链接的handler,很明显这里是和客户端交互的地方。

先说结论,根据源码在ServerBootstrapAcceptor中channelRead方法将我们childhandler添加。
那么就带来两个问题。1 child channle是如何从channelRead获取。2、channelRead是何时调用
不过这两个问题通过源码可以一并解决:

首先是知道从那里来。ServerBootstrapAcceptor继承ChannelInboundHandlerAdapter,作为inbound的handler,并且重写channelRead
那么当客户端发送数据到客户端时,对于服务端就是接收读取的io事件,那么就会执行channelRead这个方法(这里就很清楚io一般时阻塞,所以在channelRead
方法中childHandler一般是我们自己实现的io操作handler,将这个handler绑定childChannel上处理,且处理他的线程为childGroup)这么做的用途很明显,

 //inbound事件到来时,这里就是客户端IO
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
            try {
             //将操作io的handler绑定到childGroup,执行完成后断开childchannel
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
图片.png
上一篇下一篇

猜你喜欢

热点阅读