深入浅出Netty源码剖析

Netty学习 - Bootstrap引导

2018-07-19  本文已影响10人  buzzerrookie

引导

在Netty中,有两种引导,一是Bootstrap,用于引导客户端或者无连接服务器;另一种便是ServerBootstrap,用于引导面向连接的服务器。Bootstrap整个类层次如下图所示,本文将依次分析AbstractBootstrap、Bootstrap和ServerBootstrap。


Bootstrap类层次.png

AbstractBootstrap类

AbstractBootstrap类的javadoc说明如下:

AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. It support method-chaining to provide an easy way to configure the AbstractBootstrap.
When not used in a ServerBootstrap context, the bind() methods are useful for connectionless transports such as datagram (UDP).

上述文字表明:AbstractBootstrap类支持方法的链式调用,当不使用ServerBootstrap时,bind方法可以用于无连接的协议如UDP等,这与日常用法相一致。

成员变量和构造函数

volatile EventLoopGroup group;
@SuppressWarnings("deprecation")
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler handler;

AbstractBootstrap() {
    // Disallow extending from a different package.
}

AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
    group = bootstrap.group;
    channelFactory = bootstrap.channelFactory;
    handler = bootstrap.handler;
    localAddress = bootstrap.localAddress;
    synchronized (bootstrap.options) {
        options.putAll(bootstrap.options);
    }
    synchronized (bootstrap.attrs) {
        attrs.putAll(bootstrap.attrs);
    }
}

以上代码有以下几点需要注意:

成员方法

AbstractBootstrap类是一种构建者模式(Builder)

bind方法

bind方法会在内部调用doBind方法,首先会调用initAndRegister方法初始化并注册通道,接下来按注册是否结束分情况讨论,都是交由doBind0方法处理。

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

initAndRegister方法

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

abstract void init(Channel channel) throws Exception;

initAndRegister方法是一个模板方法

  1. 利用channelFactory新建通道,以前述的ReflectiveChannelFactory为例,其newChannel方法会根据传入的Channel类型调用对应的无参构造函数返回新建的通道;
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
    
    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
    
  2. 调用抽象的init方法初始化新建的通道,子类需要重写该方法;
  3. 将新建的通道注册到与该引导类关联的EventLoopGroup上。

doBind0方法

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

doBind0方法使得在Channel绑定的EventLoop上执行具体的通道绑定操作。注意从doBind0被调用的位置可以看到其一定是在注册操作完成之后被调用:

Bootstrap类

Bootstrap类继承了AbstractBootstrap类,新增加了remoteAddress和resolver成员变量,与之对应有remoteAddress和resolver成员方法。

成员方法

ServerBootstrap类

ServerBootstrap类继承了AbstractBootstrap类,新增加了childHandler、childGroup、childOptions和childAttrs成员变量,并增加了与之对应的成员方法。

成员方法

ServerBootstrapAcceptor类

ServerBootstrapAcceptor类是ServerBootstrap类的私有静态内部类,用于充当Reactor模式中的Acceptor角色,它继承了ChannelInboundHandlerAdapter类:

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;
    private final Runnable enableAutoReadTask;

    ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
        // 省略一些代码
    }

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
        child.pipeline().addLast(childHandler);
        setChannelOptions(child, childOptions, logger);
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

    // 省略一些代码
}

注意channelRead方法:

ServerBootstrap引导的结果是将ServerSocketChannel注册到group变量(即所谓的bossGroup)表示的EventLoopGroup里的一个EventLoop上,即ServerBootstrapAcceptor只运行于一个EventLoop里。

上一篇 下一篇

猜你喜欢

热点阅读