NIO基础

2020-09-06  本文已影响0人  笔记本一号

不多BB开门见山

NIO的核心概念:

Buffer

Buffer实质是一个字节数组,包含这些类型:CharBuffer、DoubleBuffer、IntBuffer、LongBuffer、ByteBuffer、ShortBuffer、FloatBuffer

buffer的结构:

Buffer读写模式的切换

Buffer利用flip() 进行读写模式的切换,在写模式下position 会记录写入的字节的位置,limit会等于capacity表示允许最大可写容量,读的时候调用flip(),Buffer切换至读模式,limit会被至为position的位置表示最大的可读容量,而position会被至为0,一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。
compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

下面是compact(),会把未读的数据移动到Buffer的最前端,而position会放置在未读数据的一个位置,这样有利于我们清除数据后还能读取未来得及读的数据

操作buffer时的步骤为:
1.写入数据到 Buffer;
2.调用 flip() 方法;
3.从 Buffer 中读取数据;
4.调用 clear() 方法或者 compact() 方法。
5.当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 .flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。

Selector

Selector通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。通道和缓冲区的机制,使得 Java NIO 实现了同步非阻塞 IO 模式,在此种方式下,用户进程发起一个 IO 操作以后便可返回做其它事情,而无需阻塞地等待 IO 事件的就绪,但是用户进程需要时不时的询问 IO 操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的 CPU 资源浪费。鉴于此,需要有一个机制来监管这些 IO 事件,如果一个 Channel 不能读写(返回 0),我们可以把这件事记下来,然后切换到其它就绪的连接(channel)继续进行读写。在 Java NIO 中,这个工作由 selector 来完成,

Selector 是一个对象,它可以接受多个 Channel 注册,监听各个 Channel 上发生的事件,并且能够根据事件情况决定 Channel 读写。这样,通过一个线程可以管理多个 Channel,从而避免为每个 Channel 创建一个线程,节约了系统资源。如果你的应用打开了多个连接(Channel),但每个连接的流量都很低,使用 Selector 就会很方便。

要使用 Selector,就需要向 Selector 注册 Channel,然后调用它的 select() 方法。这个方法会一直阻塞到某个注册的通道有事件就绪,这就是所说的轮询。一旦这个方法返回,线程就可以处理这些事件。

Selector说白了就是能帮我们把注册在上面的各个通道并且是我们感兴趣的事件轮询出来,这个轮询不是普通的循环遍历,而是一种系统级别的操作,性能比普通的轮询高,所以轮询通道连接最好不要自己用代码实现,而是使用多路复用器

一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统BIO一连接一线程模型,性能、弹性伸缩能力和可靠性都得到了极大的提升.

NIO编码

整个NIO的步骤如下:

server端


public class ChatServer {

    private static final int DEFAULT_PORT = 8888;
    private static final String QUIT = "quit";
    private static final int BUFFER = 1024;

    private ServerSocketChannel server;
    private Selector selector;
    private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
    private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
    private Charset charset = Charset.forName("UTF-8");
    private int port;

    public ChatServer() {
        this(DEFAULT_PORT);
    }

    public ChatServer(int port) {
        this.port = port;
    }

    private void start() {
        try {
            //打开ServerSocketChannel,这是所有客户端连接的父管道,所有的客户端连接都要通过它 
            server = ServerSocketChannel.open();
            //设置非阻塞通道
            server.configureBlocking(false);
            server.socket().bind(new InetSocketAddress(port));
            //多路复用器
            selector = Selector.open();
            //将ServerSocketChannel注册到多路复用器上,并监听客户端连接事件
            server.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("启动服务器, 监听端口:" + port + "...");

            while (true) {
                //表示没有就绪的通道,跳过本次循环
                if (selector.selectNow()==0){//非阻塞监听,也可以设置为阻塞监听:selector.select
                 continue;
                }
                //selector.select如果设置为阻塞监听就不需要上面的if判断,因为设置阻塞监听如果通道没有事件,它会将自己阻塞住,有事件才玩往下走
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                for (SelectionKey key : selectionKeys) {
                    // 处理被触发的事件
                    handles(key);
                }
                //清理通道,避免事件重复处理
                selectionKeys.clear();

              //另一种李大佬书上的写法
       /**       selector.select();
                selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    handles(iterator.next());
                    //清理事件,避免事件重复处理
                    iterator.remove();
                }
          **/
  }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(selector);
        }

    }

    private void handles(SelectionKey key) throws IOException {
        // ACCEPT事件 - 和客户端建立了连接
        if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
//将客户端连接进来的通道
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            //注册通道的读监听事件
            client.register(selector, SelectionKey.OP_READ);
            System.out.println(getClientName(client) + "已连接");
        }
        // READ事件 - 客户端发送了消息
        else if (key.isReadable()) {
            SocketChannel client = (SocketChannel) key.channel();
            //读取客户端消息
            String fwdMsg = receive(client);
            if (fwdMsg.isEmpty()) {
                // 取消这个通道的监听事件
                key.cancel();
                //通知多路复用器重新调整监听,selector.wakeup()在selector是阻塞模式的时候可以用,可以唤醒阻塞
                //selector.wakeup();
            } else {
                System.out.println(getClientName(client) + ":" + fwdMsg);
                //消息转发
                forwardMessage(client, fwdMsg);
                // 检查用户是否退出
                if (readyToQuit(fwdMsg)) {
                    key.cancel();
                     //通知多路复用器重新调整监听,selector.wakeup()在selector是阻塞模式的时候可以用,可以唤醒阻塞
                //selector.wakeup();
                    System.out.println(getClientName(client) + "已断开");
                }
            }

        }
    }
    //将其他客户的消息转发给客户端
    private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {
        for (SelectionKey key: selector.keys()) {
            Channel connectedClient = key.channel();
            if (connectedClient instanceof ServerSocketChannel) {
                continue;
            }
            //key.isValid()判断SelectionKey是否有效,
            //client.equals(connectedClient) 判断消息不是这个客户端发的,避免自己给自己发消息
            if (key.isValid() && !client.equals(connectedClient)) {
                wBuffer.clear();
                //写消息到Buffer中
                wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));
                //切换到读状态
                wBuffer.flip();
                //判断Buffer中是否还残存数据
                while (wBuffer.hasRemaining()) {
                    //将Buffer的数据写入通道
                    ((SocketChannel)connectedClient).write(wBuffer);
                }
            }
        }
    }

    //接收客户端消息
      private String receive(SocketChannel client,SelectionKey key) throws IOException {
        try {
            rBuffer.clear();
            //将通道的消息读到缓冲区
            while (client.read(rBuffer) > 0) ;
            //切换到读模式
            rBuffer.flip();
        }catch (Exception e){
            key.cancel();
            client.socket().close();
            client.close();
            System.out.println(e.toString());
            return StringUtils.EMPTY;
        }
        //从缓冲区读数据
        return String.valueOf(charset.decode(rBuffer));
    }

    private String getClientName(SocketChannel client) {
        return "客户端[" + client.socket().getPort() + "]";
    }

    private boolean readyToQuit(String msg) {
        return QUIT.equals(msg);
    }

    private void close(Closeable closable) {
        if (closable != null) {
            try {
                closable.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ChatServer chatServer = new ChatServer(8888);
        chatServer.start();
    }
}

client

public class ChatClient {

    private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
    private static final int DEFAULT_SERVER_PORT = 8888;
    private static final String QUIT = "quit";
    private static final int BUFFER = 1024;

    private String host;
    private int port;
    private SocketChannel client;
    private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
    private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
    private Selector selector;
    private Charset charset = Charset.forName("UTF-8");

    public ChatClient() {
        this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
    }

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public boolean readyToQuit(String msg) {
        return QUIT.equals(msg);
    }

    private void close(Closeable closable) {
        if (closable != null) {
            try {
                closable.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void start() {
        try {
            client = SocketChannel.open();
            client.configureBlocking(false);
            selector = Selector.open();
            //注册连接监听事件
            client.register(selector, SelectionKey.OP_CONNECT);
            client.connect(new InetSocketAddress(host, port));
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                for (SelectionKey key : selectionKeys) {
                    handles(key);
                }
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClosedSelectorException e) {
            // 用户正常退出
        } finally {
            close(selector);
        }

    }
/*
由于我们使用的SocketChannel处于非阻塞模式,当调用connect()方法时,调用会立即返回,但是连接的过程还在进行,
需要后续调用finishConnect()方法来完成连接过程。在连接过程已经启动,但尚未完成之前,
isConnectionPending()会返回true,这就是我们此时在检测的状态。如果连接未能正常创建,
调用finishConnect()则会抛出IOException异常,标志着连接失败。
*/
    private void handles(SelectionKey key) throws IOException {
        // CONNECT事件 - 连接就绪事件
        if (key.isConnectable()) {
            SocketChannel client = (SocketChannel) key.channel();
            if (client.isConnectionPending()) {
                //调用finishConnect()方法来完成连接过程
                client.finishConnect();
                // 处理用户的输入
                new Thread(new UserInputHandler(this)).start();
            }
            client.register(selector, SelectionKey.OP_READ);
        }
        // READ事件 -  服务器转发消息
        else if (key.isReadable()) {
            SocketChannel client = (SocketChannel) key.channel();
            String msg = receive(client);
            if (msg.isEmpty()) {
                close(selector);
            } else {
                System.out.println(msg);
            }
        }
    }

    public void send(String msg) throws IOException {
        if (msg.isEmpty()) {
            return;
        }

        wBuffer.clear();
        wBuffer.put(charset.encode(msg));
        wBuffer.flip();
        while (wBuffer.hasRemaining()) {
            client.write(wBuffer);
        }

        // 检查用户是否准备退出
        if (readyToQuit(msg)) {
            close(selector);
        }
    }

    private String receive(SocketChannel client) throws IOException {
        rBuffer.clear();
        while (client.read(rBuffer) > 0);
        rBuffer.flip();
        return String.valueOf(charset.decode(rBuffer));
    }

    class UserInputHandler implements Runnable {

        private ChatClient chatClient;

        public UserInputHandler(ChatClient chatClient) {
            this.chatClient = chatClient;
        }

        @Override
        public void run() {
            try {
                // 等待用户输入消息
                BufferedReader r =
                        new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    String input = r.readLine();
                    // 向服务器发送消息
                    chatClient.send(input);
                    // 检查用户是否准备退出
                    if (chatClient.readyToQuit(input)) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ChatClient client = new ChatClient("127.0.0.1", 8888);
        client.start();
    }
}
image.png image.png image.png image.png
上一篇下一篇

猜你喜欢

热点阅读