Netty学习

NIO编程基本概念

2021-07-08  本文已影响0人  攻城老狮

1 NIO概述

1.1 NIO基本概念

  1. Java NIO 全称 Java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 NewIO),是同步非阻塞的。
  2. NIO 有三大核心部分:Channel(通道)、Buffer(缓冲区)、Selector(选择器)
  3. NIO面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
  4. Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
  5. NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。

1.2 NIO与BIO的比较

  1. BIO 是以流的方式处理数据,而 NIO 是以块的方式处理数据,块的效率比流的效率高很多。
  2. BIO是同步阻塞的(客户端发送请求后,服务端不能确定数据是否有效,此时的线程会一直处于阻塞状态,等待有正确的数据后才会执行,此时其他的任何操作都无法进行。如果有大量请求,前方的阻塞,后方的请求也会阻塞,进入一个队列排队。可以使用多线程的方式做优化,但服务器的线程是有限的,此时的CPU利用率没有合理利用,当需要处理大量客户端时,性能急剧下降),NIP是同步非阻塞的(NIO会把每个通道注册到选择器上,监控这些通道的I/O状况,当任务完全准备就绪后,会把任务分配给服务器端的一个或多个线程进行处理,若未准备就绪,则不会吧任务分配给服务器上,此时可以进一步利用CPU资源,不必一直阻塞等待,而是当有任务时才进行处理)。
  3. BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。
  4. Buffer和Channel之间的数据流向是双向的。

1.3 NIO三大组件关系

三大组件:Selector、Channel、Buffer

image-20210704145200524.png

2 缓冲区(Buffer)

2.1 基本概念

  1. 缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。
  2. NIO 中,Buffer 是一个顶层父类,它是一个抽象类。Buffer的子类:ByteBuffer、ShortBuffer、CharBuffer、IntBuffer、LongBuffer、DoubleBuffer、FloatBuffer
  3. Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息。【mark(标记),position(位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变该值,为下次读写作准备),limit(表示缓冲区的当前终点,不能对缓冲区超过limit的位置进行读写操作,limit可以修改),capcity(容量,可以容纳的最大数据量,缓冲区创建时设定并且不能改变)】

2.2 Buffer类的相关方法

public class BufferTest {

    @Test
    public void test1(){
        String str = "abcde";
        //1.分配指定大小的缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);
        //2.查看当前缓冲区的变量信息
        System.out.println("------allocate()------");
        System.out.println(buf.capacity()); //1024
        System.out.println(buf.limit()); //1024
        System.out.println(buf.position()); //0

        //3.存放数据到缓冲区
        buf.put(str.getBytes());
        System.out.println("------put()------");
        System.out.println(buf.capacity()); //1024
        System.out.println(buf.limit()); //1024
        System.out.println(buf.position()); //5

        //4.切换读写数据模式
        buf.flip();
        System.out.println("------flip()------");
        System.out.println(buf.capacity()); //1024
        System.out.println(buf.limit()); //5
        System.out.println(buf.position()); //0

        //5.获取缓冲区的数据
        byte[] dst = new byte[buf.limit()];
        buf.get(dst);
        System.out.println(new String(dst,0,dst.length)); //abcde
        System.out.println("------get()------");
        System.out.println(buf.capacity()); //1024
        System.out.println(buf.limit()); //5
        System.out.println(buf.position()); //5

        //6.可重复读
        buf.rewind();
        System.out.println("------rewind()------");
        System.out.println(buf.capacity()); //1024
        System.out.println(buf.limit()); //5
        System.out.println(buf.position()); //0

        //7.清空缓冲区,缓冲区中的数据依然存在,但是处在被遗忘状态
        buf.clear();
        System.out.println("------clear()------");
        System.out.println(buf.capacity()); //1024
        System.out.println(buf.limit()); //1024
        System.out.println(buf.position()); //0
        System.out.println((char) buf.get()); //a
    }

    @Test
    public void test2(){
        String str = "abcde";
        ByteBuffer buf = ByteBuffer.allocate(1024);
        buf.put(str.getBytes());
        buf.flip();
        byte[] dst = new byte[buf.limit()];
        buf.get(dst,0,2);
        System.out.println(new String(dst,0,2)); //ab
        System.out.println(buf.position()); //2

        //标记位置
        buf.mark();

        buf.get(dst,2,2);
        System.out.println(new String(dst,2,2)); //cd
        System.out.println(buf.position()); //4

        //恢复到mark位置
        buf.reset();
        System.out.println(buf.position()); //2

        //判断缓冲区是否还有剩余数据
        if (buf.hasRemaining()){
            //获取缓冲区可以操作的数量
            System.out.println(buf.remaining()); //3
        }
    }

    @Test
    public void test3(){
        //分配直接缓冲区
        ByteBuffer buf = ByteBuffer.allocateDirect(1024);
        //是否是直接缓冲区
        System.out.println(buf.isDirect()); //true
    }
}

3 通道(Channel)

3.1 通道基本概念

  1. NIO 的通道类似于流,主要的区别是:

    • 通道可以同时进行读写,而流只能读或者只能写
    • 通道可以实现异步读写数据
    • 通道可以从缓冲读数据,也可以写数据到缓冲
  2. BIO的流是单向的,NIO的通道是双向的,可以读也可以写操作

  3. Channel 在 NIO 中是一个接口

  4. 常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel

    、SocketChannel

  5. FIleChannel用于文件的数据读写,DatagramChannel用于UDP数据的读写,ServerSocketChannel 和 SocketChannel 用于TCP的数据读写

  6. 通道用于源节点与目标节点的连接。在java NIO中负责缓冲区中数据的传输。Channel本身不存储数据,需要配合缓冲区进行传输

3.2 Channel的相关方法

public class ChannelTest {

    //利用通道实现文件的复制(非直接缓冲区)
    @Test
    public void test1(){
        FileInputStream fis = null;
        FileOutputStream fos = null;
        FileChannel inChannel = null;
        FileChannel outChannel = null;
        try{
            fis = new FileInputStream("code.png");
            fos = new FileOutputStream("code2.png");
            //1.获取通道
            inChannel = fis.getChannel();
            outChannel = fos.getChannel();
            //2.分配指定大小的缓冲区
            ByteBuffer buf = ByteBuffer.allocate(1024);
            //3.将通道中的数据存入缓冲区
            while (inChannel.read(buf) != -1){
                buf.flip(); //切换读取数据模式
                //4.将缓冲区的数据写入通道
                outChannel.write(buf);
                buf.clear(); //清空缓冲区
            }
        }catch (IOException e){
            e.printStackTrace();
        }finally {
            if (inChannel!=null){
                try {
                    inChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(outChannel!=null){
                try {
                    outChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (fis!=null){
                try {
                    fis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (fos!=null){
                try {
                    fos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //利用通道实现文件的复制(直接缓冲区)
    @Test
    public void test2() throws IOException {
        FileChannel inChannel = FileChannel.open(Paths.get("code.png"), StandardOpenOption.READ);
        FileChannel outChannel = FileChannel.open(Paths.get("code2.png"),StandardOpenOption.WRITE,
                StandardOpenOption.READ,StandardOpenOption.CREATE);
        //内存映射文件
        MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
        MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
        //直接对缓冲区进行数据的读写操作
        byte[] dst = new byte[inMappedBuf.limit()];
        inMappedBuf.get(dst);
        outMappedBuf.put(dst);

        inChannel.close();
        outChannel.close();
    }

    //利用通道实现文件的复制(直接缓冲区)
    @Test
    public void test3() throws IOException {
        FileChannel inChannel = FileChannel.open(Paths.get("code.png"),StandardOpenOption.READ);
        FileChannel outChannel = FileChannel.open(Paths.get("code2.png"),StandardOpenOption.WRITE,
                                                    StandardOpenOption.READ,StandardOpenOption.CREATE);
        //inChannel.transferTo(0,inChannel.size(),outChannel);
        outChannel.transferFrom(inChannel,0,inChannel.size());
        inChannel.close();
        outChannel.close();
    }

    //分散与聚集
    @Test
    public void test4() throws IOException {
        FileChannel inChannel = FileChannel.open(Paths.get("code.png"),StandardOpenOption.READ);
        FileChannel outChannel = FileChannel.open(Paths.get("code2.png"),StandardOpenOption.WRITE,
                StandardOpenOption.READ,StandardOpenOption.CREATE);
        //分配指定大小的缓冲区数组
        ByteBuffer[] byteBuffers = new ByteBuffer[2];
        byteBuffers[0] = ByteBuffer.allocate(100);
        byteBuffers[1] = ByteBuffer.allocate(512);

        //分散读取
        while (inChannel.read(byteBuffers) != -1){
            Arrays.asList(byteBuffers).forEach(buffer->buffer.flip());
            //聚集写入
            outChannel.write(byteBuffers);
            Arrays.asList(byteBuffers).forEach(buffer->buffer.clear());
        }

        inChannel.close();
        outChannel.close();
    }

    //字符集转换
    @Test
    public void test5(){
        String str = "你好,netty";
        //获取编码集
        Charset charset = StandardCharsets.UTF_8;

        CharBuffer charBuf = CharBuffer.allocate(1024);
        charBuf.put(str);
        charBuf.flip();

        //编码
        ByteBuffer byteBuf = charset.encode(charBuf);
        //展示编码结果
        for (int i = 0; i < byteBuf.limit(); i++) {
            System.out.println(byteBuf.get());
        }

        //解码
        byteBuf.flip();
        charBuf = charset.decode(byteBuf);
        System.out.println(charBuf.toString());
    }
}

4 选择器(Selector)

4.1 选择器基本概念

  1. JavaNIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)。
  2. Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个 Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
  3. 只有在连接/通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。
  4. 避免了多线程之间的上下文切换导致的开销。

4.2 Selector示意图

image-20210708094410626.png

5 NIO非阻塞网络编程

5.1 NIO编程步骤

  1. 当客户端连接时,会通过 ServerSocketChannel 得到SocketChannel
  2. Selector 进行监听 select 方法,返回有事件发生的通道的个数
  3. 将 socketChannel 注册到 Selector 上,register(Selector sel,int ops),一个 Selector 上可以注册多个 SocketChannel
  4. 注册后返回一个 SelectionKey,会和该 Selector 关联(集合)
  5. 监听到事件发生后,得到各个发生事件对应的 SelectionKey
  6. 再通过 SelectionKey 反向获取 SocketChannel,方法 channel()
  7. 可以通过得到的 channel,完成业务处理

5.2 NIO实例

实现一个NIO实例,完成服务器和客户端之间的数据简单通讯(非阻塞)

public class NioServer {
    public static void main(String[] args) throws IOException {
        //创建ServerSocketChannel 
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //得到一个Selecor对象
        Selector selector = Selector.open();
        //绑定一个端口6666, 在服务器端监听
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        //把 serverSocketChannel 注册到  selector 关心 事件为 OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //循环等待客户端连接
        while (true){
            //这里我们等待1秒,如果没有事件发生, 返回
            if (selector.select(1000) == 0 ){
                System.out.println("do not have connect...");
                continue;
            }
            //如果返回的>0, 就获取到相关的 selectionKey集合
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()){
                //获取到SelectionKey
                SelectionKey key = it.next();
                //如果是 OP_ACCEPT, 有新的客户端连接
                if (key.isAcceptable()){
                    //该该客户端生成一个 SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    //将  SocketChannel 设置为非阻塞
                    socketChannel.configureBlocking(false);
                    //将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel关联一个
                  BuffersocketChannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
                    System.out.println("客户端注册成功,当前选择器中的通道个数:"+ selector.keys().size());
                }
                //发生 OP_READ
                if (key.isReadable()){
                    //通过key 反向获取到对应channel
                    SocketChannel channel = (SocketChannel) key.channel();
                    //获取到该channel关联的buffer
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    channel.read(buf);
                    System.out.println("from client: "+ new String(buf.array()));
                }
                //手动从集合中移动当前的selectionKey, 防止重复操作
                it.remove();
            }
        }

    }
}
public class NioClient {
    public static void main(String[] args) throws IOException {
        //得到一个网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //提供服务器端的ip 和 端口
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        //连接服务器
        if (!socketChannel.connect(inetSocketAddress)){
            while (!socketChannel.finishConnect()){
                System.out.println("连接需要时间,客户端做其他工作");
            }
        }
        String str = "hello,Server!";
        //将数据传到buf中
        ByteBuffer buf = ByteBuffer.wrap(str.getBytes());
        socketChannel.write(buf);
        //发送数据,将 buffer 数据写入 channel
        System.in.read();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读