Netty学习笔记(一)组件

2018-10-28  本文已影响8人  云师兄

Channel,EventLoop和ChannelFuture类构成了Netty网络抽象的代表:

Channel接口

Channel是对Socket的封装,大大降低了直接使用Socket的复杂性。

EventLoop接口

EventLoop用于处理连接的生命周期中所发生的事件。在服务端编程中,EventLoop起到了监听端口连接和数据读取的工作。

ChannelFuture接口

Netty中的所有IO操作都是异步的,一个操作不会立即返回,我们需要在执行操作之后的某个时间点确定其结果的方法,即ChannelFuture接口,其addListener方法注册了一个ChannelFutureListener,一遍在某个操作完成时得到通知。

ChannelHandler和ChannelPipeline

ChannelHandler充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler常常实现了传入数据包拆分以及业务逻辑控制的功能。

ChannelPipeline接口

ChannelPipeline为ChannelHandler链提供容器,并定义了用于在该链上传播入站和出站事件流的API。即ChannelPipeline其实就是一个保存很多ChannelHandler的链表。

BootStrap接口

Netty的引导类为应用程序的网络层配置提供了容器。有两种类型的引导:一种用户客户端,称为Bootstrap,另一种称为用于服务器,称为ServerBootstrap。

示例

上面提到的这些接口在实际使用过程中并不是挨个使用的,而是使用BootStrap接口来将需要的接口组织在一起,下面是个基于netty的服务端的例子:

public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                    .handler(new ServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                        }
                    });
            ChannelFuture f = b.bind(8888).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("channelActive");
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        System.out.println("channelRegistered");
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        System.out.println("handlerAdded");
    }
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 耗时的操作
                String result = loadFromDB();
                ctx.channel().writeAndFlush(result);
                ctx.executor().schedule(new Runnable() {
                    @Override
                    public void run() {
                        // ...
                    }
                }, 1, TimeUnit.SECONDS);
            }
        }).start();
    }
    private String loadFromDB() {
        return "hello world!";
    }
}

下面就针对上面使用的ServerBootstrap类来分析Netty是如何来封装java nio相关的操作的。

Netty 如何获取NIO中需要的Channel

首先从ChannelFuture f = b.bind(8888).sync();中的bind方法开始分析。
这个bind方法经过层层调用,最终会调用initAndRegister方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
  final ChannelFuture regFuture = initAndRegister();
  ...
}
final ChannelFuture initAndRegister() {
  Channel channel = null;
  try {
    channel = channelFactory.newChannel();
    ...

从上述源码中可以看到channel来自channelFactory的newChannel方法。ChannelFactory是一个接口,我们只能从它的实现类ReflectiveChannelFactory来看下具体实现了:

    private final Class<? extends T> clazz;
  
    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

可以看出这个工厂方法的newChannel其实就是根据clazz属性来反射出具体的对象。那当前代码中的clazz属性值是多少呢?回到一开始示例执行的channel(NioServerSocketChannel.class)这句代码,这个channel方法实现如下:

public B channel(Class<? extends C> channelClass) {
   return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

所以ServerBootstrapy引导类中使用的通道就是channel方法中设置的NioServerSocketChannel通道。

NioServerSocketChannel实现

接下来就要来看下Netty的NioServerSocketChannel内部是怎么调用java NIO中的ServerSocketChannel的。
NioServerSocketChannel内部实现步骤如下:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
   this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
     return provider.openServerSocketChannel();
    } catch (IOException e) {
     throw new ChannelException("Failed to open a server socket.", e);
    }

从NioServerSocketChannel的构造函数可以看出java NIO的SelectorProvider.provider().openServerSocketChannel()生成了ServerSocketChannel通道。
接下来看第二个点,即在哪设置通道的非阻塞模式,NioServerSocketChannel构造函数如下:

public NioServerSocketChannel(ServerSocketChannel channel) {
  super(null, channel, SelectionKey.OP_ACCEPT);
  ...
}

super方法调用了父类AbstractNioChannel的构造函数如下:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
   super(parent);
   ...
   ch.configureBlocking(false);
   ...

所以NioServerSocketChannel的构造函数中将通道设置了非阻塞模式。除了设置为非阻塞模式外,还调用了父类的构造函数super(parent),具体如下:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

从上面这段代码中可以看出在构造函数中创建了一个新的pipeline,并且这个链表一开始是空的。

在介绍了ServerBootstrap中如何调用java NIO中的ServerSocketChannel的实现后,我们接下来看ServerBootstrap是如何初始化服务器Channel的。还是从示例中ChannelFuture f = b.bind(8888).sync();中bind方法中的initAndRegister方法中的代码开始分析:

final ChannelFuture initAndRegister() {
  Channel channel = null;
  try {
    channel = channelFactory.newChannel();
    init(channel);
  }
  ...
}
abstract void init(Channel channel) throws Exception;

之前我们分析了从channelFactory中获取ServerChannel通道的逻辑,这里再分析下ServerBootstrap中这个init方法:

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

init方法首先将传入的childOption,childAttr属性保存到ServerBootstrap对象中;另外又把用户通过handler方法设置的handler对象添加到pipeline尾部,然后再在尾部添加一个ServerBootstrapAcceptor对象,里面包装了用户通过childHandler方法传入的对象。

注册Selector

上面主要学习了netty内部是如何封装了ServerSocketChannel等NIO相关的操作的,接下来再继续看下netty内部是如何封装NIO中注册选择器selector的。
回到我们之前提到的initAndRegister方法内部:

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            ......
        }

        ChannelFuture regFuture = config().group().register(channel);
        ......
    }

我们已经讲过获取ServerSocketChannel和初始化通道这两个部分,接下来再看下面config().group().register(channel);内部是如何实现注册Selector的。
由于EventLoopGroup是接口,在本例中实现类为NioEventLoop。

EventLoopGroup接口继承关系
NioEventLoop类实现的register方法:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
        // 参数校验......
        try {
            ch.register(selector, interestOps, task);
        } catch (Exception e) {
            throw new EventLoopException("failed to register a channel", e);
        }
    }

可以看出最终执行的SeverSocketChannel.register(selector,interestOps,task);方法,即还是执行了java NIO中通道注册到选择器selector的方法。

端口绑定

上面我们已经分析了AbstractBootstrap类里面doBind方法中initAndRegister方法的内部实现:创建ServerSocketChannel通道,初始化,注册通道到选择器selector上这三步。下面再继续分析doBind方法中后面关于端口绑定的实现。
从下面代码中的doBind0方法开始解析:

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
            .......
}

private static void doBind0(
      final ChannelFuture regFuture, final Channel channel,
      final SocketAddress localAddress, final ChannelPromise promise) {
            channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

从上面源码中可以看出,在doBind0方法内部执行了ServerSocketChannel的bind方法,即进行端口的绑定操作。

上一篇下一篇

猜你喜欢

热点阅读