Java网络编程:Netty框架学习(二)---Java NIO

2020-12-03  本文已影响0人  singleZhang2010

概述

上篇中已经讲到Java中的NIO类库,Java中也称New IO,类库的目标就是要让Java支持非阻塞IO,基于这个原因,更多的人喜欢称Java NIO为非阻塞IO(Non-Block IO),称“老的”阻塞式Java IO为OIO(Old IO)。
总体上说,NIO弥补了原来面向流的OIO同步阻塞的不足,它为标准Java代码提供了高速的、面向缓冲区的IO。
了解上一篇讲到的四种I/O模型的话,我们可以很容易看出Java NIO采用的是IO多路复用(IO Multiplexing)模型。
NIO特征:

  1. NIO中引入了Channel(通道)和Buffer(缓冲区)的概念。读取和写入,只需要从通道中读取数据到缓冲区中,或将数据从缓冲区中写入到通道中,读取Buffer的数据可以是无序的读取。
  2. NIO使用了通道和通道的多路复用技术 来实现非阻塞的操作,当我们调用read方法时,如果此时有数据,则read读取数据并返回;如果此时没有数据,则read直接返回,而不会阻塞当前线程。
  3. NIO有选择器,NIO的选择器实现,是基于底层的选择器的系统调用,需要底层操作系统提供支持

Java NIO 核心组件

Java NIO由以下三个核心组件组成:

1. 通道(Channel)
在NIO中,同一个网络连接使用一个通道表示,所有NIO的IO操作都是从通道开始的。一个通道类似于OIO中的两个流的结合体,既可以从通道读取,也可以向通道写入。

2. 缓冲区(Buffer)
应用程序与通道(Channel)主要的交互操作,就是进行数据的read读取和write写入,这里就需要依赖NIO Buffer(NIO缓冲区),它是数据的载体。
通道的读取,就是将数据从通道读取到缓冲区中;通道的写入,就是将数据从缓冲区中写入到通道中
3. 选择器(Selector)
在上一篇中提到过文件句柄数,这里的文件句柄其实就是文件描述符,它标识的就是一个网络连接。
一个进程/线程可以同时监视多个文件描述符。在NIO中通过选择器(Selector)对这些文件描述符进行监视,监视哪些文件描述符是可读或者可写的。

selector
IO多路复用,从具体的开发层面来说,首先把通道注册到选择器中,然后通过选择器内部的机制,可以查询(select)这些注册的通道是否有已经就绪的IO事件(例如可读、可写、网络连接完成等)。
一般来说,一个单线程处理一个选择器,一个选择器可以监控很多通道。通过选择器,一个单线程可以处理数百、数千、数万、甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销。
由于Java NIO的Selector组件和操作系统底层的IO多路复用的支持,我们可以很简单地使用一个线程,通过选择器去管理多个通道。

NIO Buffer(缓冲区)

在NIO中有8种缓冲区类,分别如下:

这些Buffer类在其内部,有一个byte[]数组内存块,作为内存缓冲区。
查看其中源码,如下

    //ByteBuffer类 代码片段

    //
    final byte[] hb;                  // Non-null only for heap buffers

Buffer类的重要成员属性:capacity(容量)、position(读写位置)、limit(读写的限制)、mark(标记)
※说明:capacity容量不是指内存块byte[]数组的字节的数量。capacity容量指的是写入的数据对象的数量。

通过简单地使用Buffer示例加深对这四个属性的印象,创建BufferTest.java

package com.zhxin.nettylab.nio.chapter1;

import java.nio.CharBuffer;

/**
 * @ClassName BufferTest
 * @Description //BufferTest
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/3 0003 下午 4:03
 **/
public class BufferTest {

    public static void main(String[] args){

        //创建Buffer,capacity为10
        CharBuffer cbf = CharBuffer.allocate(10);

        System.out.println(cbf.capacity()); //容量:10
        System.out.println(cbf.limit());    //读写限制:10
        System.out.println(cbf.position()); //读写位置:0 起始值

        cbf.put("a");
        cbf.put("b");
        cbf.put("c");
        System.out.println(cbf.position()); //输出3

        cbf.flip(); //buffer从写入转换成读取,把limit设置为position,把position还原成0
        System.out.println(cbf.position());
        System.out.println(cbf.limit());

        //取值
        System.out.println(cbf.get()); //取第一个元素 a
        System.out.println(cbf.position()); //读写位置变为1

        cbf.clear(); //clear方法将limit设置成capacity,position设置成0
        System.out.println(cbf.limit());
        System.out.println(cbf.position());
        System.out.println(cbf.get(2)); //读取第三个元素c

        System.out.println(cbf.position());//读写位置不变,get方法加了索引值,根据索引来取值不影响position

        System.out.println(cbf.get());
        System.out.println(cbf.get());
        cbf.mark(); //标记
        System.out.println(cbf.position());  //标记后的位置为2
        System.out.println(cbf.get());
        System.out.println(cbf.position());
        cbf.reset();//返回标记
        System.out.println(cbf.position()); //返回标记的位置2

        cbf.clear();//读取完成后,调用Buffer.clear() 或Buffer.compact()方法,将缓冲区转换为写入模式
        System.out.println(cbf.limit());
        System.out.println(cbf.position());
        System.out.println(cbf.capacity());
    }
}

除了前面的3个属性,第4个属性mark(标记)比较简单。就是相当一个暂存属性,暂时保存position的值,方便后面的重复使用position值


Buffer四个重要成员属性

NIO Channel(通道)

NIO中一个连接就是用一个Channel(通道)来表示。更广泛的层面来说,一个通道可以表示一个底层的文件描述符。
JavaNIO的通道还可以更加细化。例如,对应不同的网络传输协议类型,在Java中都有不同的NIO Channel(通道)实现。
四种重要的Channel类型,分别如下:

package com.zhxin.nettylab.nio.chapter2;

import java.io.*;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

/**
 * @ClassName BufferTest
 * @Description //BufferTest FileChannel文件通道 demo
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/3 0003 下午 2:43
 **/
public class ChannelTest {

    public static void main(String[] args){
        File bt = new File("E:/project/nettylab/src/main/resources/buffer.txt");

        //当try语句块运行结束时,FileInputStream 会被自动关闭
        // 这是因为FileInputStream 实现了java中的java.lang.AutoCloseable接口
        // 所有实现了这个接口的类都可以在try-with-resources结构中使用
        // 以FileInputStream、FileOutputStream 文件输入流和文件输出流来创建FileChannel
        try(FileChannel inCnl = new FileInputStream(bt).getChannel();
            FileChannel outCnl = new FileOutputStream("E:/project/nettylab/src/main/resources/buffer1.txt").getChannel()){
            MappedByteBuffer bf = inCnl.map(FileChannel.MapMode.READ_ONLY,0,bt.length()); //从Channel获取数据
            Charset crt = Charset.forName("UTF-8");
            outCnl.write(bf); //向Channel写数据
            bf.clear();
            CharsetDecoder cd = crt.newDecoder();

            //decode 把ByteBuffer 转 CharBuffer
            CharBuffer cb = cd.decode(bf);
            System.out.println(cb);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

NIO Selector(选择器)

选择器是NIO中非常重要的角色,选择器的使命是完成IO的多路复用。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系,是监控和被监控的关系。

通道和选择器之间的关系,通过register(注册)的方式完成。
调用通道的Channel.register(Selector sel, int ops)方法,可以将通道实例注册到一个选择器中。
register方法有两个参数:

//SelectionKey.java 部分源码

public static final int OP_READ = 1 << 0;

public static final int OP_WRITE = 1 << 2;

public static final int OP_CONNECT = 1 << 3;

public static final int OP_ACCEPT = 1 << 4;

SelectionKey选择键
通道和选择器的监控关系注册成功后,就可以选择就绪事件。这些IO事件类型指的就是通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件。
例如,某个SocketChannel通道,完成了和服务端的握手连接,则处于“连接就绪”(OP_CONNECT)状态;
某个ServerSocketChannel服务器通道,监听到一个新连接的到来,则处于“接收就绪”(OP_ACCEPT)状态。
SelectableChannel类
※FileChannel文件通道就不能被选择器监控或选择,判断一个通道能否被选择器监控或选择,有一个前提:判断它是否继承了抽象类SelectableChannel(可选择通道)
Java NIO中所有网络链接Socket套接字通道,都继承了SelectableChannel类,都是可选择的。
选择器使用流程
使用选择器,主要有以下三步:

  1. 获取选择器实例;
  2. 将通道注册到选择器中;
  3. 轮询感兴趣的IO就绪事件(选择键集合)
    通过示例加深一下印象,创建服务端demo SelectorTest.java
package com.zhxin.nettylab.nio.chapter3;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @ClassName SelectorTest
 * @Description //选择器 使用  服务器端示例
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/4 0004 上午 9:17
 **/
public class SelectorTest {
    private static Selector selector;
    public static void main(String[] args){

        try{
            /*
             * 获取选择器示例
             * Selector选择器的类方法open()的内部,是向选择器SPI(SelectorProvider)发出请求,
             * 通过默认的SelectorProvider(选择器提供者)对象,获取一个新的选择器实例。
             * Java中SPI全称为(Service Provider Interface,服务提供者接口),是JDK的一种可以扩展的服务提供和发现机制。
             * Java通过SPI的方式,提供选择器的默认实现版本
             */
            selector = Selector.open();

            /*
            * 将通道注册到选择器实例
            * */
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打开ServerSocketChannel,获取通道
            serverSocketChannel.configureBlocking(false); //设为非阻塞
            serverSocketChannel.bind(new InetSocketAddress(8989)); //将该通道对于的serverSocket绑定到port端口
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//将通道注册到选择器上,监听"接收连接"事件

            /*
             * 选出感兴趣的IO就绪事件(选择键集合)
             * 通过Selector选择器的select()方法,选出已经注册的、已经就绪的IO事件,保存到SelectionKey选择键集合中
             * 遍历这些IO事件,进行对应的处理
             */
            while (selector.select() > 0){
                Set<SelectionKey> selectKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectKeys.iterator();

                while(keyIterator.hasNext()){
                    SelectionKey key = keyIterator.next();
                    if(key.isAcceptable()){
                        // ServerSocketChannel服务器监听通道有新连接
                        handleAccept(key);
                    } else if(key.isReadable()){
                        // 传输通道可读
                        handleRead(key);
                    } else if(key.isWritable()){
                        //传输通道可读
                        handleWrite(key);
                    }

                    //移除处理完的选择键
                    keyIterator.remove();
                }

            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 处理客户端新连接事件
     */
    private static void handleAccept(SelectionKey key) throws IOException {
        // 获取客户端连接通道
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = server.accept();
        socketChannel.configureBlocking(false);

        // 信息通过通道发送给客户端
        String msg = "Hello Client!";
        socketChannel.write(ByteBuffer.wrap(msg.getBytes()));

        // 给通道设置读事件,客户端监听到读事件后,进行读取操作
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    /**
     * 监听到可读,处理客户端发送过来的信息
     */
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();

        // 从通道读取数据到缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(128);
        channel.read(buffer);

        // 输出客户端发送过来的消息
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("server received msg from client:" + msg);
    }

    private static void handleWrite(SelectionKey key){
    }
}

创建客户端demo ClientTest.java

package com.zhxin.nettylab.nio.chapter3;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @ClassName ClientTest
 * @Description //客户端
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/4 0004 上午 9:52
 **/
public class ClientTest {

    private static Selector selector;

    public static void main(String[] args) throws IOException {

        // 创建通道管理器(Selector)
        selector = Selector.open();

        // 创建通道SocketChannel
        SocketChannel channel = SocketChannel.open();
        // 将通道设置为非阻塞
        channel.configureBlocking(false);

        // 客户端连接服务器,其实方法执行并没有实现连接,需要在handleConnect方法中调channel.finishConnect()才能完成连接
        channel.connect(new InetSocketAddress("localhost", 8989));

        /**
         * 将通道(Channel)注册到通道管理器(Selector),并为该通道注册selectionKey.OP_CONNECT
         * 注册该事件后,当事件到达的时候,selector.select()会返回,
         * 如果事件没有到达selector.select()会一直阻塞。
         */
        channel.register(selector, SelectionKey.OP_CONNECT);

        while (selector.select() > 0) {

            Set<SelectionKey> selectKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectKeys.iterator();

            while(keyIterator.hasNext()){
                SelectionKey key = keyIterator.next();
                if(key.isConnectable()){
                    // 传输通道连接成功 一般用在客户端
                    handleConnect(key);
                } else if(key.isReadable()){
                    // 传输通道可读
                    handleRead(key);
                } else if(key.isWritable()){
                    //传输通道可读
                    handleWrite(key);
                }

                //移除处理完的选择键
                keyIterator.remove();
            }

        }

    }

    /**
     * 处理 和服务器端连接成功事件
     * */
    private static void handleConnect(SelectionKey key) throws IOException {

        // 获取与服务端建立连接的通道
        SocketChannel channel = (SocketChannel) key.channel();
        if (channel.isConnectionPending()) {
            // channel.finishConnect()才能完成连接
            channel.finishConnect();
        }

        channel.configureBlocking(false);

        // 数据写入通道
        String msg = "Hello Server!";
        channel.write(ByteBuffer.wrap(msg.getBytes()));

        // 通道注册到选择器,并且这个通道只对读事件感兴趣
        channel.register(selector, SelectionKey.OP_READ);
    }

    /**
     * 监听到可读,处理服务端发送过来的信息
     */
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();

        // 从通道读取数据到缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(128);
        channel.read(buffer);

        // 输出服务端响应发送过来的消息
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("client received msg from server:" + msg);
    }

    private static void handleWrite(SelectionKey key){

    }
}

总结

到这里已经算是踏入了JAVA NIO的大门了,以上都是比较简单的demo实践,没有看到“粘包”和“拆包”等复杂问题,后续会接触到。
Java NIO编程大致的特点如下:

  1. 在NIO中,服务器接收新连接的工作,是异步进行的。不像Java的OIO那样,服务器监听连接,是同步的、阻塞的。NIO可以通过选择器(也可以说成:多路复用器),后续不断地轮询选择器的选择键集合,选择新到来的连接。
  2. 在NIO中,SocketChannel传输通道的读写操作都是异步的。如果没有可读写的数据,负责IO通信的线程不会同步等待。这样,线程就可以处理其他连接的通道;不需要像OIO那样,线程一直阻塞,等待所负责的连接可用为止。
  3. 在NIO中,一个选择器线程可以同时处理成千上万个客户端连接,性能不会随着客户端的增加而线性下降。

代码示例地址:
https://gitee.com/kaixinshow/java-nionetty-learning

上一篇下一篇

猜你喜欢

热点阅读