javaJava 杂谈程序员

第十七节 netty源码分析之pipeline的来源

2019-01-31  本文已影响9人  勃列日涅夫

pipeline的来源

Bootstrap中channel(NioSocketChannel.class)
-》NioSocketChannel的初始化时创建-》NioSocketChannel this(DEFAULT_SELECTOR_PROVIDER)
-》NioSocketChannel中this(newSocket(provider)) newSocket方法中的参数provider会根据操作系统选择不同的SelectorProvider(SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();)。 创建socketchannel方法provider.openSocketChannel(),SelectorProvider的用法可参考nio中的
-》继续调用构造器this(null, socket);其中的socket为newSocket方法创建的
-》接下来调用父类的构造器和初始化cofnig

查看NioSocketChannel的构造代码:

 public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

-》 转到父类的构造器中

   /**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     */
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }
--》 继续调用父类构造器会发现又一次掉用父类的构造器 super(parent);
--》 下面找到我们费尽心机的pipeline的创建如下:
    /**
    * Creates a new instance.
    *
    * @param parent
    *        the parent of this channel. {@code null} if there's no parent.
    */
   protected AbstractChannel(Channel parent) {
       this.parent = parent;
       id = newId();
       unsafe = newUnsafe();
       pipeline = newChannelPipeline();
   }
--》 上面的newChannelPipeline方法其实调用DefaultChannelPipeline的构造器我们跟踪一下piple的创建
 protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
* 从上面的构造方法中可以推算Pipeline包含为一个双向链表,且会有一个头和尾,类图如下

总结:每创建要给channel中会创建一个pipeline. pipeline为一个双向链表,会有一个头和尾
 头尾,从两者继承关系来看基本一致。区别在于实现接口不同,head实现ChannelOutboundHandler, ChannelInboundHandler而tail实现ChannelInboundHandler
图片.png 图片.png

再看下他们的父类 AbstractChannelHandlerContext 的构造器, 分别以参数 inbound , outbound .来区分head和tail
结合他们实现的接口,header 是一个 outboundHandler, 而 tail 是一个inboundHandler。

2 piple是如何将handler起到作用的呢?
1、 ChannelInitializer 的添加
初始化 Bootstrap, 会添加我们自定义的 ChannelHandler, 就以我们熟悉的 EchoClient 来举例

Bootstrap b = new Bootstrap();
            b.group(group)
            //初始化工厂ReflectiveChannelFactory为后续链接connect方法创建NioSocketChannel对象
             .channel(NioSocketChannel.class)
                    //将选项添加到AbstractBootstrap属性options. 实现类中Bootstrap的init(Channel channel)方法设置channel的类型
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });
            // Start the client.
            //包含channel(注意channel可抽象为socket链接来理解)实例化Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister(最终channelFactory.newChannel();这里的factory就是前面设置的ReflectiveChannelFactory)
           //  NioSocketChannel最终newSocket 来打开一个新的 Java NIO SocketChannel, 最后调用父类AbstractChannel(Channel parent)
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();

在调用 handler 时, 传入了 ChannelInitializer 对象, 它提供了一个 initChannel 方法供我们初始化 ChannelHandler. 那么这个初始化过程是怎样的呢
源码追踪:首先 Bootstrap中调用handler方法 .handler(new ChannelInitializer<SocketChannel>(),将ChannelInitializer父类AbstractBootstrap中方法

  /**
     * the {@link ChannelHandler} to use for serving the requests.
     */
    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return self();
    }

初始化后的使用过程: 其实再ChannelFuture f = b.connect(HOST, PORT).sync();中的connect方法中,一路看下去会在父类AbstractBootstrap找到initAndRegister()初始化这楼里
是一个很重要的方法。暂时我们只关注 init(channel);这里是我们要关注的地方因为这里是channel和pipeline交互的关键,而且他的具体实现在Bootstrap中

  @Override
  @SuppressWarnings("unchecked")
  void init(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      // config使用bootstrap.handler(),就是最初ChannelInitializer,可参考上面分析
      p.addLast(config.handler());

      final Map<ChannelOption<?>, Object> options = options0();
      synchronized (options) {
          //设置channel类型
          setChannelOptions(channel, options, logger);
      }

最终我们要关注 p.addLast(config.handler());这里将我们出事的handler添加到了pipleline的末端(pipleline的结构借本介绍在前面已介绍)
为了添加一个 handler 到 pipeline 中, 必须把此 handler 包装成 ChannelHandlerContext. 因此在上面的代码中我们可以看到新实例化了一个 newCtx 对象, 并将 handler 作为参数传递到构造方法中
最后追踪到DefaultChannelPipeline中方法

@Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //检查是否重名
            checkMultiplicity(handler);
//创建DefaultChannelHandlerContext对象
            newCtx = newContext(group, filterName(name, handler), handler);
//            将生成的newCtx插入handlercontex链表中,addLast0方法会将newCtx插入tail之前
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
 DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
            //isInbound  isOutbound 方法返回true false 
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

ChannelInitializer的继承关系如下图:

图片.png

下一节再具体分析pipeline和handler的具体实现细节:

上一篇 下一篇

猜你喜欢

热点阅读