nio

netty系列之(一)——netty介绍

2018-11-26  本文已影响0人  康康不遛猫

IO模型

阻塞式IO.png
非阻塞式IO.png
IO复用.png
信号驱动式.png
异步IO.png 图片.png
上述5种IO模型,前4种模型-阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步I/O模型,因为其中真正的I/O操作(recvfrom)将阻塞进程,在内核数据copy到用户空间时都是阻塞的。

一、NIO原理

Netty 是基于Java NIO 封装的网络通讯框架,只有充分理解了 Java NIO 才能理解好Netty的底层设计。Java NIO 由三个核心组件组件:
Buffer:固定数量的数据的容器。在 Java NIO 中,任何时候访问 NIO 中的数据,都需要通过缓冲区(Buffer)进行操作。NIO 最常用的缓冲区则是 ByteBuffer。
Channel:是一个通道,它就像自来水管一样,网络数据通过 Channel 这根水管读取和写入。传统的 IO 是基于流进行操作的,Channle 和流类似,但又有些不同:

图片.png
#传统IO:FileInputStream
public static void method2(){
       InputStream in = null;
       try{
           in = new BufferedInputStream(new FileInputStream("src/nomal_io.txt"));
 
           byte [] buf = new byte[1024];
           int bytesRead = in.read(buf);
           while(bytesRead != -1)
           {
               for(int i=0;i<bytesRead;i++)
                   System.out.print((char)buf[i]);
               bytesRead = in.read(buf);
           }
       }catch (IOException e)
       {
           e.printStackTrace();
       }finally{
           try{
               if(in != null){
                   in.close();
               }
           }catch (IOException e){
               e.printStackTrace();
           }
       }
   }
#NIO
public static void method1(){
        RandomAccessFile aFile = null;
        try{
            aFile = new RandomAccessFile("src/nio.txt","rw");
            FileChannel fileChannel = aFile.getChannel();
            ByteBuffer buf = ByteBuffer.allocate(1024);
 
            int bytesRead = fileChannel.read(buf);
            System.out.println(bytesRead);
 
            while(bytesRead != -1)
            {
                buf.flip();
                while(buf.hasRemaining())
                {
                    System.out.print((char)buf.get());
                }
 
                buf.compact();
                bytesRead = fileChannel.read(buf);
            }
        }catch (IOException e){
            e.printStackTrace();
        }finally{
            try{
                if(aFile != null){
                    aFile.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
//使用Buffer一般遵循下面几个步骤:
//分配空间(ByteBuffer buf = ByteBuffer.allocate(1024); 还有一种allocateDirector后面再陈述)
//写入数据到Buffer(int bytesRead = fileChannel.read(buf);)
//调用filp()方法( buf.flip();)
//从Buffer中读取数据(System.out.print((char)buf.get());)
//调用clear()方法或者compact()方法

Channel 必须要配合 Buffer 一起使用,通过从 Channel 读取数据到 Buffer 中或者从 Buffer 写入数据到 Channel 中,如下:

图片.png
Selector
多路复用器 Selector,它是 Java NIO 编程的基础,Selector 提供了询问Channel是否已经准备好执行每个 I/O 操作的能力。简单来讲,Selector 会不断地轮询注册在其上的 Channel,如果某个 Channel 上面发生了读或者写事件,这个 Channel 就处于就绪状态,会被 Selector 轮询出来,然后通过 SelectionKey 可以获取就绪 Channel 的集合,进行后续的 I/O 操作。
image.png

TCP服务端实例-NIO实现

NIO客户端代码(连接)
//获取socket通道
SocketChannel channel = SocketChannel.open();        
channel.configureBlocking(false);
//获得通道管理器
selector=Selector.open();        
channel.connect(new InetSocketAddress(serverIp, port));
//为该通道注册SelectionKey.OP_CONNECT事件
channel.register(selector, SelectionKey.OP_CONNECT);

NIO客户端代码(监听)
while(true){
    //选择注册过的io操作的事件(第一次为SelectionKey.OP_CONNECT)
   selector.select();
   while(SelectionKey key : selector.selectedKeys()){
       if(key.isConnectable()){
           SocketChannel channel=(SocketChannel)key.channel();
           if(channel.isConnectionPending()){
               channel.finishConnect();//如果正在连接,则完成连接
           }
           channel.register(selector, SelectionKey.OP_READ);
       }else if(key.isReadable()){ //有可读数据事件。
           SocketChannel channel = (SocketChannel)key.channel();
           ByteBuffer buffer = ByteBuffer.allocate(10);
           channel.read(buffer);
           byte[] data = buffer.array();
           String message = new String(data);
           System.out.println("recevie message from server:, size:"
               + buffer.position() + " msg: " + message);
       }
   }
}

NIO服务端代码(连接)
//获取一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
//获取通道管理器
selector = Selector.open();
//将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

NIO服务端代码(监听)
while(true){
    //当有注册的事件到达时,方法返回,否则阻塞。
   selector.select();
   for(SelectionKey key : selector.selectedKeys()){
       if(key.isAcceptable()){
           ServerSocketChannel server =
                (ServerSocketChannel)key.channel();
           SocketChannel channel = server.accept();
           channel.write(ByteBuffer.wrap(
            new String("send message to client").getBytes()));
           //在与客户端连接成功后,为客户端通道注册SelectionKey.OP_READ事件。
           channel.register(selector, SelectionKey.OP_READ);
       }else if(key.isReadable()){//有可读数据事件
           SocketChannel channel = (SocketChannel)key.channel();
           ByteBuffer buffer = ByteBuffer.allocate(10);
           int read = channel.read(buffer);
           byte[] data = buffer.array();
           String message = new String(data);
           System.out.println("receive message from client, size:"
               + buffer.position() + " msg: " + message);
       }
   }
}

二、netty

1、netty特点

2、netty线程模型

netty基于Reactor模型,是对NIO模型的一种改进。

3、netty核心组件

netty服务端 代码示例

EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
//EventLoopGroup继承线程池ScheduledExecutorService
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);//利用反射构造NioServerSocketChannel实例
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//backlog指定了内核为此套接口排队的最大连接个数
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new LoggingServerHandler());//handler与childHandler不同
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new MyChannelHandler1());
        ch.pipeline().addLast(new MyChannelHandler2());
        ch.pipeline().addLast(new MyChannelHandler3());
    }
});
ChannelFuture f = bootstrap.bind(port).sync();//bind方法实现
f.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                //启动成功
            }
});
f.channel().closeFuture().sync();

class MyChannelHandler1 extends ChannelHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
    }
}

Channel
Channel 是 Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能。
EventLoop
Netty 基于事件驱动模型,使用不同的事件来通知我们状态的改变或者操作状态的改变。它定义了在整个连接的生命周期里当有事件发生的时候处理的核心抽象。
Channel 为Netty 网络操作抽象类,EventLoop 主要是为Channel 处理 I/O 操作,两者配合参与 I/O 操作。

图片.png
上图为Channel、EventLoop、Thread、EventLoopGroup之间的关系。一个 EventLoop 在它的生命周期内只能与一个Thread绑定,一个 EventLoop 可被分配至一个或多个 Channel ,轮流处理。

ChannelFuture
Netty 为异步非阻塞,即所有的 I/O 操作都为异步的,因此,我们不能立刻得知消息是否已经被处理了。Netty 提供了 ChannelFuture 接口,通过该接口的 addListener() 方法注册一个 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。

ChannelFuture f = bootstrap.bind(port).sync();
f.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                //启动成功
            }
});
f.channel().closeFuture().sync();

ChannelHandler
ChannelHandler 为 Netty 中最核心的组件,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelHandler 有两个核心子类 ChannelInboundHandler 和 ChannelOutboundHandler,其中 ChannelInboundHandler 用于接收、处理入站数据和事件,而 ChannelOutboundHandler 则相反。

ChannelPipeline
ChannelPipeline 为 ChannelHandler 链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API。一个数据或者事件可能会被多个 Handler 处理,在这个过程中,数据或者事件经流 ChannelPipeline,由 ChannelHandler 处理。在这个处理过程中,一个 ChannelHandler 接收数据后处理完成后交给下一个 ChannelHandler,或者什么都不做直接交给下一个 ChannelHandler。

图片.png
当一个数据流进入 ChannlePipeline 时,它会从 ChannelPipeline 头部开始传给第一个 ChannelInboundHandler ,当第一个处理完后再传给下一个,一直传递到管道的尾部。与之相对应的是,当数据被写出时,它会从管道的尾部开始,先经过管道尾部的 “最后” 一个ChannelOutboundHandler,当它处理完成后会传递给前一个 ChannelOutboundHandler 。

附录


TCP.png
上一篇下一篇

猜你喜欢

热点阅读