5.NIO阻塞与非阻塞

2018-05-17  本文已影响0人  xialedoucaicai

在讨论阻塞/非阻塞之前,我们先看看IO的两个阶段

对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:

  1. 等待数据准备 (Waiting for the data to be ready)
  2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
    ---------引用自IO - 同步,异步,阻塞,非阻塞 (亡羊补牢篇)

1.阻塞

阻塞:传统的IO操作是阻塞的,在等待数据和真正读写数据期间(即上述1 2两步),线程被阻塞,直到操作完成,在此期间不能做其他事情。
以传统的socket通信为例,看代码:

public void server() throws Exception{
    //获取服务端通道
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    //绑定端口号
    serverChannel.bind(new InetSocketAddress(9999));
    while(true){
        //获取客户端通道
        SocketChannel clientChannel = serverChannel.accept();
        //读取通道中数据,并回应客户端
            //缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while(clientChannel.read(buffer) != -1){
                buffer.flip();
                System.out.println("收到消息"+new String(buffer.array(),0,buffer.limit()));
                buffer.clear();
            }
            //通知客户端,服务器收到了
            buffer.put("服务端响应客户端!".getBytes());
            buffer.flip();
            clientChannel.write(buffer);
            //向客户端发送-1,否则客户端会一直阻塞
            clientChannel.shutdownOutput();
    }
}
public void client1() throws IOException, InterruptedException{
    //获取通道
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",9999));
    //分配缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //本地文件传输到服务器
    //for(int i=0;i<10;i++){
        String msg = "客户端1发消息"+System.currentTimeMillis();
        System.out.println(msg);
        buffer.put(msg.getBytes());
        //模拟网络很慢
        Thread.sleep(20000);
        buffer.flip();
        socketChannel.write(buffer);
        buffer.clear();
    //}
    System.out.println("客户端1消息发完了");
    //加入这一句,向服务端发送-1,告诉服务端我文件传完了
    //否则服务端会一直读取,导致阻塞,因为没有得到-1
    socketChannel.shutdownOutput();
    //获取服务端的反馈
    int len = 0;
    while((len = socketChannel.read(buffer)) != -1){
        buffer.flip();
        System.out.println(new String(buffer.array(),0,len));
    }
    socketChannel.close();
}
public void client2() throws IOException, InterruptedException{
    //获取通道
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",9999));
    //分配缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.put("我是client2".getBytes());
    buffer.flip();
    socketChannel.write(buffer);
    buffer.clear();
    //加入这一句,向服务端发送-1,告诉服务端我文件传完了
    //否则服务端会一直读取,导致阻塞,因为没有得到-1
    socketChannel.shutdownOutput();
    //获取服务端的反馈
    int len = 0;
    while((len = socketChannel.read(buffer)) != -1){
        buffer.flip();
        System.out.println(new String(buffer.array(),0,len));
    }
    
    socketChannel.close();
}
  1. 服务端启动,在while(true)中等待客户端连接。
  2. 客户端1连接,模拟网络很慢,20s后才发送数据给服务端。
  3. 在此期间,客户端2连接,但服务端未完成读取数据,所以无法响应客户端2
  4. 等客户端1数据读取结束后,才能处理客户端2的请求

为解决主线程被阻塞无法响应其他请求,一般会在accept()之后,结合线程池开启线程去处理客户端请求,这样即使阻塞子线程,主线程还能继续响应,但阻塞问题仍然存在,且线程切换在并发量大的时候会很频繁,影响服务器性能。

2.非阻塞

非阻塞:在等待数据期间是可以做其他操作的,但在真正读写数据时也是阻塞的。

解决思路:阻塞产生的主要原因是发起读请求后,数据并未真正可读,主线程一直在那里等,等到可读为止。那我们引入一个监控通知机制,让数据真正可读的时候再发起读请求,不可读时,先处理其他请求,不要干等着。

Selector:在NIO中通过Selector实现非阻塞通信。
服务端将Channel和它所感兴趣的事件注册到selector,然后轮询selector中感兴趣的事件是否发生,事件未发生时,主线程被操作系统阻塞(select()阻塞直到Channel感兴趣事件发生),事件发生时,select()返回,主线程遍历selector中所有事件,针对不同的事件做相应的处理。

看刘欣的<<Http Server : 一个差生的逆袭>>,认为这里事件未发生时,主线程是阻塞的,是操作系统在感兴趣的事件发生时,对事件进行标记,将主线程唤醒去处理的,唤醒后需要遍历所有注册在selector上的事件,再对被标记的事件进行处理。
每次主线程被唤醒还得遍历所有事件,一点也不智能。让操作系统直接把可处理的事件告诉主线程,免去了主线程遍历查找的步骤,这就是epoll。

还是上面的例子,我们看非阻塞方式是如何处理的:

@Test
public void server() throws IOException {
    try {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000));
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        // 注册 channel,并且指定感兴趣的事件是 Accept
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        ByteBuffer readBuff = ByteBuffer.allocate(1024);
        ByteBuffer writeBuff = ByteBuffer.allocate(128);
        writeBuff.put("服务端反馈".getBytes());
        writeBuff.flip();

        while (true) {
            // select会阻塞直到指定通道感兴趣的事件发生
            int nReady = selector.select();
            System.out.println("select()返回值:" + nReady);
            if (nReady == 0) {
                continue;
            }
            // 已选择的键集合 被select()判断为已准备好的键
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();

            while (it.hasNext()) {
                // SelectionKey表示通道和选择器的注册关系
                SelectionKey key = it.next();
                //System.out.println("移除监控事件" + key.interestOps());
                it.remove();

                if (key.isAcceptable()) {
                    System.out.println("服务端accept");
                    // 创建新的连接,并且把连接注册到selector上,而且,
                    // 声明这个channel只对读操作感兴趣。
                    SocketChannel socketChannel = ssc.accept();
                    socketChannel.configureBlocking(false);
                    System.out.println("注册读事件");
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    System.out.println("服务端readable");
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    socketChannel.read(readBuff);
                    readBuff.flip();
                    System.out.println("服务端received : " + new String(readBuff.array(),0,readBuff.limit()));
                    readBuff.clear();
                }
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
@Test
public void client1() throws IOException, InterruptedException {
    try {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));

        ByteBuffer writeBuffer = ByteBuffer.allocate(32);
        ByteBuffer readBuffer = ByteBuffer.allocate(32);

        writeBuffer.put("hello".getBytes());
        writeBuffer.flip();
        // 模拟网络延迟
        Thread.sleep(10000);
        socketChannel.write(writeBuffer);
        socketChannel.read(readBuffer);
        readBuffer.flip();
        System.out.println("客户端收到反馈:" + new String(readBuffer.array(), 0, readBuffer.limit()));
    } catch (IOException e) {
        e.printStackTrace();
    }
}
@Test
public void client2() throws IOException, InterruptedException {
    try {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));

        ByteBuffer writeBuffer = ByteBuffer.allocate(32);
        ByteBuffer readBuffer = ByteBuffer.allocate(32);

        writeBuffer.put("hello2".getBytes());
        writeBuffer.flip();
        socketChannel.write(writeBuffer);
        socketChannel.read(readBuffer);
        readBuffer.flip();
        System.out.println("客户端收到反馈:" + new String(readBuffer.array(), 0, readBuffer.limit()));
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  1. 服务端启动后,告诉selector帮我监控下ServerSocketChannel的accept()事件
  2. 客户端连接之后,accept()事件触发,告诉selector帮我监控下SocketChannel的read()事件。
  3. 由于客户端1的数据迟迟未到,主线程阻塞在select()。
  4. 在此期间,客户端2连接到了服务器,select()返回,主线程被操作系统唤醒,开始遍历selector中的就绪事件,发现accept()就绪,处理accept(),客户端数据可读,处理read()事件。
  5. 客户端1的数据终于到了,处理客户端1的read()事件。
    也就是主线程没有一直在哪里干等客户端1的数据,而是做了其他事(处理客户端2的accept()事件),这就实现了非阻塞的目的。

到这里,应该就能理解NIO与IO的第二点区别了吧:

2.借助Selector实现了非阻塞

3. 异步

异步IO等待数据和读取数据期间都不会发生阻塞。
对于read操作,主线程只负责将read的命令信息告诉内核,然后就返回。剩下的两步工作都由内核完成,然后通知主线程,任务完成了。

4. 总结

举一个不是非常贴切的例子类比一下:
阻塞:基层码农,增删改查
非阻塞:架构师,写框架
异步:老板,动嘴不动手

关于阻塞/非阻塞,再举一个服务员的例子:
阻塞:服务员,客人来了(accept事件),客人点菜(处理accept),厨师做菜(等待客户端数据),服务员一直等了10分钟,厨师做好了(读就绪),给客人端菜
非阻塞:服务员,不停地找活干,客人来了(accept),客人点菜(处理accept),厨师做菜,服务员并没有在哪里等10分钟,而是继续找其他的活,这期间还接待了很多组客人(处理了多个accept事件),发现厨师做好了(读就绪),给客人端菜

其实,Java NIO的细节还有很多,这里只是对其有一个简单了解,如果需要更深入学习,可以参考这些资料:

实际使用的时候,我们一般也不会直接用NIO来完成Socket通信,可以使用现成的易用的框架,比如Netty,后面有空了可以再研究下这个框架,感觉功能很强大啊

上一篇 下一篇

猜你喜欢

热点阅读