Bootstrap --- 客户端

2017-11-23  本文已影响0人  水欣
  1. 简述
    这一章是netty源码分析系列的第一章,在这一章中只展示Netty的客户端和服务端的初始化和启动的过程,给读者一个对Netty源码有一个大致的框架上的认识,而不会深入每个功能模块。
    本章会从Bootstrap/ServerBootstrap类入手,分析Netty程序的初始化和启动的流程。
  2. Bootstrap
    Bootstrap 是netty提供的一个便利的工厂类,我们可以通过他来完成Netty的客户端或服务端的Netty初始化。
    下面以Netty源码例子中的Echo服务器作为例子,从客户端和服务端分别分析一下Netty的程序是如何启动的。
  3. 客户端部分
EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
     .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             p.addLast(new EchoClientHandler());
         }
     });

    // Start the client.
    ChannelFuture f = b.connect(HOST, PORT).sync();

    // Wait until the connection is closed.
    f.channel().closeFuture().sync();
} finally {
    // Shut down the event loop to terminate all threads.
    group.shutdownGracefully();
}

上面的客户端代码虽然简单,但是却展示了Netty客户端初始化时所需的所有内容
(1) EventLoopGroup:不论是服务器端还是客户端,都必须指定EventLoopGroup,在这个例子中,指定了NioEventLoopGroup,表示一个Nio的EventLoopGroup.
(2)ChannelType:指定Channel的类型。因为是客户端,因此使用了NioSocektChannel
(3) Handler:设置数据的处理器。

  1. NioSocketChannel的初始化过程
    在Netty中,Channel是一个Socket的抽象,它为用户提供了关于Socket状态(是否是连接还是断开)以及对Socket的读写等操作。每当Netty建立了一个连接后,都会有一个对应的Channel实例。
    NioSocketChannel的类层次结构如下


    1.png

下面着重分析一个Channel的初始化过程

  1. ChannelFactory 和Channel类型的确定
    除了TCP协议以外,Netty还支持很多其他的连接协议,并且每种协议还有NIO(异步IO)和OIO(Old-IO,即传统的阻塞IO)版本的区别,不同协议不同的阻塞类型的连接都有不同的Channel类型与之对应,下面是写常用的Channel类型:
public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

而ReflectiveChannelFactory 实现了ChannelFactory接口,它提供了唯一的方法,即newChannel。ChannelFactory,顾名思义,就是产生Channel的工厂类。
进入到ReflectiveChannelFactory.newChannel中,我们看到其实现代码如下

public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

根据上面代码的提示,我们就可以确定:

  1. Channel实例化
    前面我们已经知道了如何确定一个Channel的类型,并且了解到Channel是通过工厂方法ChannelFacotry.newChannel()来实例化的,那么ChannelFactory.newChannel()方法在哪儿调用?
Bootstrap.connect -> Bootstrap.doResolveAndConnect ->AbstractBootrap.initAndRegister

在AbstractBootstrap.initAndRegister中就调用了channelFactory.newChannel()来获取一个新的NioSocketChannel实例,其源码如下:

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

在newChannel中,通过类对象newInstance来获取一个新的Channel实例,因而会调用NioSocketChannel的默认构造器NioSocketChannel 默认构造器代码如下

public NioSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

这里的代码比较关键,我们看到,在这个构造器中,会调用newSocket来打开一个新的 java NIO socketChannel:

private static SocketChannel newSocket(SelectorProvider provider) {
    ...
    return provider.openSocketChannel();
}

接着会调用父类,即AbstractNioByteChannel的构造器

AbstractNioByteChannel(Channel parent, SelectableChannel ch)

并传入参数parent为null,ch为刚才使用newSocket创建的Java Nio SocketChannel,因此生成的NioSocketChannel的parent channel为Null

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

接着会继续调用父类的AbstractNioChannel的构造器,并传入了参数readInterestOp = SelectionKey.OP_READ:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    // 省略 try 块
    // 配置 Java NIO SocketChannel 为非阻塞的.
    ch.configureBlocking(false);
}

然后继续调用父类AbstractChannel的构造器

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

到这里,一个完整的NioSocketChannel就初始化完成了,我们可以稍微总结一下构造一个NioSocketChannel所需要做的工作:

  1. 关于unsafe 字段的初始化
    我们简单地提到,在实例化NioSocketChannel的过程中,会在父类AbstractChannel的构造器中,调用newUnsafe()来获取一个unsafe实例。那么unsafe是怎么初始化的呢?它的作用是什么?
    其实unsafe特别关键,它封装了对java底层Socket的操作,因此实际上是连通Netty上层和java底层的重要桥梁。
    Unsafe接口所提供的方法
interface Unsafe {
    SocketAddress localAddress();
    SocketAddress remoteAddress();
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();
    ChannelPromise voidPromise();
    ChannelOutboundBuffer outboundBuffer();
}

一看便知,这些方法其实都会对应到相关的java底层的Socket的操作。
回到AbstractChannel的构造方法中,在这里调用了newUnsafe()获取一个新的unsafe对象,而newUnsafe方法在NioSocketChannel中被重写了

@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioSocketChannelUnsafe();
}

NioSocketChannel.newUnsafe方法会返回一个NioSocketChannelUnsafe实例,从这里我们就可以确定了,在实例化的NioSocketChannel中的unsafe字段,其实就是一个NioSocketChannelUnsafe的实例。

  1. 关于pipeline的初始化
    上面我们分析了一个Channel(在这个例子中时NioSocketChannel)的大体初始化过程,但是我们漏掉了一个关键的部分,即ChannnelPipeline的初始化。
    根据Each channel has its own pipeline and it is created automatically when a new channel is created。我们知道,在实例化一个Channel时,必然伴随着实例化的一个ChannelPipeline.而我们确实在AbstractChannel的构造器看到了pipeline字段被初始化为DefaultChannelPipeline的实例,那么看下DefaultChannelPipeline构造器做了那些工作吧:
public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

我们调用DefaultChannelPipeline的构造器,传入了一个channel,而这个channel其实就是我们实例化的NioSocketChannel,DefaultChannelPipeline会将这个NioSocketChannel对象保存在channel字段中。DefaultChannelPipeline中,还有两个特殊的字段,即head和tail,而这两个字段是一个双向链表的头和尾,其实在DefaultChannelPipeline中,维护了一个以AbstractChannelHandlerContext为节点的双向链表,这个链表是Netty实现pipeline机制的关键。关于DefaultChannelPipeline中的双向链表以及它所起的作用,下面介绍
HeadContext的继承层次结果如下所示:


1.png

TailContext 的继承层次结构如下所示:


2.png
我们可以看到,链表中head是一个ChannelOutboundHandler ,而tail则是一个ChannelInboundHandler,接着我们看一下headContext的构造器
HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}

它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound=false,outbound=true
TailContext的构造器与HeadContext的相反,它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound=true,outbound=false
即header是一个outboundHandler,而tail是一个inboundHandler

  1. 关于EventLoop初始化
    回到最开始的EchoClient.java代码中,我们在一开始实例化了一个NioEventLoopGroup对象,因此我们就从它的构造器中追踪一下EventLoop的初始化过程。
    首先来看一下NioEventLoopGroup的类继承层次


    3.png

    NioEventLoop有几个重载的构造器,不过内容都没什么大的区别,最终都是调用父类MultithreadEventLoopGroup构造器

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

其中一点是,如果我们传入的线程数nThreads是0,那么Netty会为我们设置默认的线程数DEFAULT_EVENT_LOOP_THREADS,而是这个默认的线程数是怎么确定的呢?
其实很简答,在静态代码块中,会首先确定DEFAULT_EVENT_LOOP_THREADS的值

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}

Netty会首先从系统属性中回去"io.netty.eventLoopThreads"的值,如果我们没有设置它的话,那么就返回默认值:处理器核心数*2.
回到MultithreadEventLoopGroup构造器中,这个构造器会继续调用父类MultithreadEventExecutorGroup的构造器:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    // 去掉了参数检查, 异常处理 等代码.
    children = new EventExecutor[nThreads];
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

    for (int i = 0; i < nThreads; i ++) {
        children[i] = newChild(threadFactory, args);
    }
}

根据代码,我们就很清楚MultithreadEventExecutorGroup中的处理逻辑了:

  @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

其实就是实例化一个NioEventLoop对象,然后返回它。
最后总结以下整个EventLoopGroup的初始化过程

  1. channel的注册过程
    在前面的分析中,我们提到,channel会在Bootrap.initAndRegister中进行初始化,但是这个方法还会将初始化好的Channel注册到EventLoop中。接下来我们就来分析一些Channel注册的过程
    回顾一下AbstractBootstrap.initAndRegister方法:
final ChannelFuture initAndRegister() {
    // 去掉非关键代码
    final Channel channel = channelFactory().newChannel();
    init(channel);
    ChannelFuture regFuture = group().register(channel);
}

当Channel初始化后,会紧接着调用group.register()方法来注册Channel,我们继续跟踪的话,会发现其调用链如下:
AbstractBootrap.initAndRegister -> MultithreadEventLoopGroup.register ->SingleThreadEventLoop.register -> AbstractUnsafe.register
通过跟踪调用链,最终我们发现是调用到了unsafe的register方法,那么接下来我们就仔细看一下AbstractUnsafe.register方法

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略条件判断和错误处理
    AbstractChannel.this.eventLoop = eventLoop;
    register0(promise);
}

首先,将eventLoop赋值给Channel的eventLoop属性,而我们知道这个eventLoop对象其实就是MultithreadEventLoopGroup.next()方法获取的,根据我们前面关于EventLoop初始化小节中,我们可以确定next()方法返回的eventLoop对象是NioEventLoop实例,register方法接着调用register0()

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (firstRegistration && isActive()) {
        pipeline.fireChannelActive();
    }
}

register0又调用了AbstractNioChannel.doRegister

@Override
protected void doRegister() throws Exception {
    // 省略错误处理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

javaChannel()这个方法在前面我们已经知道了,它返回的是一个 Java Nio SocketChannel,这里我们将这个SocketChannel注册到eventLoop关联的selector上了
我们总结一下Channel的注册过程:

  1. handler的添加过程
    Netty的一个强大和灵活之处就是基于Pipeline的自定义handler机制。基于此,我们可以像添加插件一样自由组合各种各样的handler来完成业务逻辑。例如我们需要处理HTTP数据,那么就可以在pipeline前添加一个Http的编解码的Handler,然后接着添加我们自己的业务逻辑的handler,这样网络上的数据流就像通过一个管道一样,从不同的handler中流过并进行编解码,最终在到达我们自定义的handler中。
    如何将自定义的handler添加到pipeline中
...
.handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         if (sslCtx != null) {
             p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
         }
         //p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoClientHandler());
     }
 });

这段代码就是实现了handler的添加功能,我们看到,Bootstrap.handler()接受一个ChannelHandler,而我们传递的是一个派生于ChannelInitializer的匿名类,他正好也实现了ChannelHandler接口,我们来看一下,ChannelInitializer类到底有什么

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
    protected abstract void initChannel(C ch) throws Exception;

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        initChannel((C) ctx.channel());
        ctx.pipeline().remove(this);
        ctx.fireChannelRegistered();
    }
    ...
}

ChannelInitializer是一个抽象类,它有一个抽象的方法initChannel,我们正是实现了这个方法,并在这个方法中添加的自定义的handler,那么initChannel是那么被调用呢?在ChannelInitializer.channelRegistered()中
我们来关注一下channelRegister(),从上面的源码中,我们可以看出,在channelRegister()中,会调用initChannel(),将自定义的handler添加到ChannelPipeline中,然后调用ctx.pipeline().remove(this)将自己从ChannelPipeline中删除,上面的分析可以用如下图片展示:
一开始,ChannelPipeline中只有三个handler,head,tail和我们添加的ChannelIntializer


1.png

接着initChannel方法调用后,添加了自定义的handler


2.png
最后将ChannelInitializer删除
3.png
  1. 客户端连接分析
    通过上面的各种分析后,我们大致了解了Netty初始化,所做的工作,那么接下来我们就分析下客户端是如何发起TCP连接的.
    首先,客户端通过调用Bootstrap的connect()进行连接
    在connect中,会进行一些参数检查后,最终调用的是doConnect0(),其实现如下:
private static void doConnect0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                if (localAddress == null) {
                    channel.connect(remoteAddress, promise);
                } else {
                    channel.connect(remoteAddress, localAddress, promise);
                }
                promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

在doConnect0()中,会在event loop线程中调用Channel的connect(),而这个Channel的具体类型是什么呢?我们在Channel初始化的这一小节已经分析过了,这里channel的类型是NioSocketChannel
进行跟踪到channel.connect中,我们发现它调用的是DefaultChannelPipeline#connect,而pipeline的connect代码如下:

 @Override
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }

而tail字段,我们已经分析过了,是一个TailContext的实例,而TailContext又是AbstractChannelHandlerContext 的子类,而且没有实现connect(),因此这里调用的其实是AbstractChanneHandlerContext.connect,我们看一下这个方法的实现

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    // 删除的参数检查的代码
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new OneTimeTask() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }

    return promise;
}

上面的代码中有一个关键的地方,即final AbstractChannelHandlerContext next = findContextOutbound(),这里调用findContextOutbound(),从DefaultChannelPipeline内的双向链表的tail开始,不断向前寻找第一个outbound为true的AbstractChannelHandlerContext,然后调用它的invokeConnect(),其代码如下:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    // 忽略 try 块
    ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}

在之前提到,在DefaultChannelPipeline的构造器中,会实例化两个对象:head和tail,并形成了双向链表的头和尾。head是HeadContext的实例,它实现了ChannelOutboundHandler接口,并且他的outbound字段为true,因此在findContextOutbound中,找到了AbstractChannelHandlerContext对象其实就是head,进而在invokeConnect(),我们向上转换为ChannelOutboundHandler就是没问题了。
而又因为HeadContext重写了connect方法,因此实际上调用的是HeadContext.connect,我们接着跟踪HeadContext.connect,其代码如下

@Override
public void connect(
        ChannelHandlerContext cox,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

这个connect()很简单,仅仅调用了unsafe的connect(),而unsafe又是什么呢?
回顾一下HeadContext的构造器,我们发现unsafe是pipeline.channel().unsafe()返回的,而Channel的unsafe字段,在这个例子中,我们已经知道了,其实是AbstractNioByteChannel.NioByteUnsafe内部类.兜兜转转了一大圈,我们找到了创建Socket连接的关键diamante
NioByteUnsafe -> AbstractNioUnsafe.connect

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    if (doConnect(remoteAddress, localAddress)) {
        fulfillConnectPromise(promise, wasActive);
    } else {
        ...
    }
}

AbstractNioUnsafe.connect的实现如上代码所示,在这个connect方法中,调用了doConnect,注意,这个方法并不是AbstractNioUnsafe的方法,而是AbstractNioChannel 的抽象方法。doConnect()是在NioSocketChannel中实现的,因此进入NioSocketChannel.doConnect中:

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
  if (localAddress != null) {
      javaChannel().socket().bind(localAddress);
  }

  boolean success = false;
  try {
      boolean connected = javaChannel().connect(remoteAddress);
      if (!connected) {
          selectionKey().interestOps(SelectionKey.OP_CONNECT);
      }
      success = true;
      return connected;
  } finally {
      if (!success) {
          doClose();
      }
  }
}

首先是获取Java Nio SocketChannel,即我们已经分析过的,从NioSocketChannel.newSocket返回的SocketChannel对象,然后调用SocketChannel.connect()完成java NIO层面的Socket的连接


4.png
上一篇 下一篇

猜你喜欢

热点阅读