IO、BIO、NIO、AIO、多路复用

2022-02-17  本文已影响0人  请叫我平爷

概念

同步、异步

阻塞、非阻塞

同步阻塞、同步非阻塞、异步非阻塞

a给b发起一个请求:

image.png

IO

只要具有输入输出类型的交互系统都可以认为是I/O模型

Input\Output
磁盘I/O模型、网络I/O模型、内存I/O模型、Direct I/O、数据库I/O

解决IO慢:

  1. SSD替代机械硬盘
  2. NIO替换BIO

BIO

Block IO:同步阻塞IO


image.png

优:

实现简单,一个线程一个任务

缺:

一个线程只能做一件事
创建大量线程支持高并发,等待时间过长会影响系统性能,只能创建少量线程来支持。

image.png

BIOServer

public class BIOServer {
    public static void main(String[] args) {
        /**
         * int corePoolSize,
         * int maximumPoolSize,
         * long keepAliveTime,
         * TimeUnit unit,
         * BlockingQueue<Runnable> workQueue
         */
        ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(1,3,0,TimeUnit.SECONDS,new LinkedBlockingQueue<>());

        Thread acceptor = new Thread(()->{
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(9000);
                int i=0;
                while (true){
                    Socket socket = serverSocket.accept();
                    Thread thread = new Thread(()->{
                        int len;
                        byte[] bytes = new byte[1024];
                        try {
                            InputStream inputStream = socket.getInputStream();
                            while ((len=inputStream.read(bytes))!=-1){
                                System.out.println(new String(bytes, 0, len));
                            }
                            Thread.sleep(2000L);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
                    threadPoolExecutor.execute( thread);
                    i++;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        acceptor.start();
    }
}

BIOClient

public class BIOClient {

    public static void main(String[] args) {
        for (int i=0;i<100;i++){
            int finalI = i;
            new Thread(()->{
                try {
                    Socket socket = new Socket("127.0.0.1",9000);

                    for (int j=0;j<3;j++) {
                        String str = new Date() + "第" + finalI + "个链接发送第" +j+ "个消息";
                        socket.getOutputStream().write(str.getBytes());
                        Thread.sleep(2000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();

        }
    }

}

NIO

New I/O:同时支持阻塞与非阻塞,使用多路复用器机制
No-Blocking I/O:同步非阻塞IO

image.png

Channel:通道,跟IO中Stream(流)差不多一个等级

Buffer:ByteBuffer(byte)、CharBuffer(char)、DoubleBuffer(double)、FloatBuffer(float)、IntBuffer(int)、LongBuffer(long)、ShortBuffer(short)、MappedByteBuffer、HeapByteBuffer、DirectByteBuffer、HeapByteBuffer在JVM的Heap(堆)上分配

  1. 本质上是一块可以读写的内存,这块内存被包装成NIO Buffer对象,并提供一组方法,用来方便访问该内存
  2. capacity:Buffer的固定大小就叫capacity,只能写byte、long、char、short、int、double、float类型,Buffer满了就需要清空才能继续写
  3. position:
    • 写的时候表示当前位置,初始的position为0,最大为capacity-1
    • 读的时候表示从特定位置读,从写切换为读,会重置为0
  4. limit:
    • 写模式下,最多能往Buffer中写多少数据,limit=capacity
    • 读模式,最多能读到多少数据,写切换为读,limit初始为position,能读到之前写入的所有数据
  5. 初始化
```java
//初始化Buffer
ByteBuffer buf = ByteBuffer.allocate(1024);
CharBuffer buffer = CharBuffer.allocate(48);
//写入到Buffer
int bytesRead = inChannel.read(buf);
//写入到Buffer
buffer.put('a');
//写切换到读,limit=position,position=0,读取所有写入的数据
buf.flip();
//从Buffer读数据
buf.get()
//从Buffer读数据
int bytesWrite =  inChannel.write(buf);
//重读所有数据
Buffer.rewind();
 //清空Buffer,position=0,limit=capacity,还能读到之前的记录
Buffer.clear()
//清空Buffer,彻底清空
Buffer.compact()
//标记Buffer的position
buffer.mark();
//获取刚刚标记的position
buffer.reset();
```
  1. euqals:只比较Buffer中剩余的元素(position到limit之间的元素)
    1. 相同的类型
    2. 剩下的个数相同
    3. 剩下的byte、char相同
  2. compareTo:
    1. 第一个不相等的元素小于另一个Buffer中对应的元素
    2. 所有元素都相等,但是其中一个Buffer中的元素先耗尽

Selector

  1. 用单个线程处理多个Channel
```
//创建Selector
Selector selector = Selector.open();
//注册Channel,channel必须为非阻塞状态
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,SelectionKey.OP_READ);
```
- Connet:channel成功连接
- Accept:serverSocketChannel准备接收
- Read:channel有数据可读
- Write:channel等待写入数据

零拷贝

传统copy


image.png

零拷贝


image.png

模仿tomcat

public class NIOServer {

    public static final String SEPARATOR = "\r\n";
    public static final int BACK_LOG = 1024;

    public static void main(String[] args) {
        ServerSocketChannel serverSocketChannel = null;
        SocketChannel channel = null;
        try{
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(9000),BACK_LOG);
            ExecutorService executorService = Executors.newFixedThreadPool(50);
            System.out.println("服务器启动成功");
            for (;;){
                channel = serverSocketChannel.accept();
                System.out.println(channel.getRemoteAddress());
                SocketChannel finalChannel = channel;
                executorService.execute(()->{
                    try {
                        Thread.sleep(2000);
                        finalChannel.write(ByteBuffer.wrap(HttpRequest().getBytes(StandardCharsets.UTF_8)));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
        catch (Exception e){
            e.printStackTrace();
        }
        finally {
            try {
                channel.close();
                serverSocketChannel.close();
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static String HttpRequest() {
        String str = "<h1>tomcat</h1>";
        StringBuilder builder = new StringBuilder();
        builder.append("HTTP/1.1 200 OK").append(SEPARATOR);
        builder.append("Connection: Close").append(SEPARATOR);
        builder.append("Content-Type: text/html;charset=utf-8").append(SEPARATOR);
        builder.append("Content-Length: " + str.length()).append(SEPARATOR);
        builder.append(SEPARATOR);
        builder.append(str);
        return builder.toString();
    }
}

AIO

Asynchronous I/O:异步非阻塞I/O模型

image.png

当任务完成了,会通知线程来处理

  1. 进程向操作系统请求数据
  2. 操作系统把外部数据加载到内核的缓冲区
  3. 操作系统把内核的缓冲区拷贝到进程的缓冲区
  4. 进程获得数据完成自己的功能

第2、3步是挂起等待状态,就是同步IO,反之就是异步IO,AIO

AIOServer

import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AIOServer {
    private ExecutorService executorService;          // 线程池
    private AsynchronousChannelGroup threadGroup;      // 通道组
    public AsynchronousServerSocketChannel asynServerSocketChannel;  // 服务器通道
    public void start(Integer port){
        try {
            // 1.创建一个缓存池
            executorService = Executors.newCachedThreadPool();
            // 2.创建通道组
            threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            // 3.创建服务器通道
            asynServerSocketChannel = AsynchronousServerSocketChannel.open(threadGroup);
            // 4.进行绑定
            asynServerSocketChannel.bind(new InetSocketAddress(port));
            System.out.println("server start , port : " + port);
            // 5.等待客户端请求
            asynServerSocketChannel.accept(this, new AIOServerHandler());
            // 一直阻塞 不让服务器停止,真实环境是在tomcat下运行,所以不需要这行代码
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        AIOServer server = new AIOServer();
        server.start(9000);
    }
}

AIOServerHandler

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
public class AIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServer> {
    private final Integer BUFFER_SIZE = 1024;
    @Override
    public void completed(AsynchronousSocketChannel asynSocketChannel, AIOServer attachment) {
        // 保证多个客户端都可以阻塞
        attachment.asynServerSocketChannel.accept(attachment, this);
        read(asynSocketChannel);
    }
    //读取数据
    private void read(final AsynchronousSocketChannel asynSocketChannel) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
        asynSocketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer resultSize, ByteBuffer attachment) {
                //进行读取之后,重置标识位
                attachment.flip();
                //获取读取的数据
                String resultData = new String(attachment.array()).trim();
                System.out.println("Server -> " + "收到客户端的数据信息为:" + resultData);
                String response = resultData + " = " + resultData;
                write(asynSocketChannel, response);
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }
    // 写入数据
    private void write(AsynchronousSocketChannel asynSocketChannel, String response) {
        try {
            // 把数据写入到缓冲区中
            ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE);
            buf.put(response.getBytes());
            buf.flip();
            // 在从缓冲区写入到通道中
            asynSocketChannel.write(buf).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void failed(Throwable exc, AIOServer attachment) {
        exc.printStackTrace();
    }
}

AIOClient

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Random;

public class AIOClient implements Runnable{

    private static Integer PORT = 9000;
    private static String IP_ADDRESS = "127.0.0.1";
    private AsynchronousSocketChannel asynSocketChannel ;
    public AIOClient() throws Exception {
        asynSocketChannel = AsynchronousSocketChannel.open();  // 打开通道
    }
    public void connect(){
        asynSocketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT));  // 创建连接 和NIO一样
    }
    public void write(String request){
        try {
            asynSocketChannel.write(ByteBuffer.wrap(request.getBytes())).get();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            asynSocketChannel.read(byteBuffer).get();
            byteBuffer.flip();
            byte[] respByte = new byte[byteBuffer.remaining()];
            byteBuffer.get(respByte); // 将缓冲区的数据放入到 byte数组中
            System.out.println(new String(respByte,"utf-8").trim());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        while(true){
        }
    }
    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            AIOClient myClient = new AIOClient();
            myClient.connect();
            new Thread(myClient, "myClient").start();
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            myClient.write(expression);
        }
    }
}

多路复用IO

select

image.png
  1. 使用copy_from_user从用户空间拷贝fd_set到内核空间
  2. 注册回调函数__pollwait
  3. 遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
  4. 以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
  5. __pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
  6. poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
  7. 如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
  8. 把fd_set从内核空间拷贝到用户空间。

poll

epoll

epoll的高效就在于,当我们调用epoll_ctl往里塞入百万个句柄时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的句柄给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。
  而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已,如何能不高效。
  epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。
  对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。
  对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。
  对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

综上,在选择select,poll,epoll时要根据具体的使用场合以及这三种方式的自身特点。

  1. select、poll的实现需要本身不断轮询全部的fd集合,直到设备就绪,期间可能要睡眠和唤醒屡次交替。而epoll其实也须要调用epoll_wait不断轮询就绪链表,期间也可能屡次睡眠和唤醒交替,可是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,可是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就好了,这节省了大量的CPU时间。这就是回调机制带来的性能提高。
  2. select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,而且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,并且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并非设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省很多的开销。
  3. 表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。
  4. select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善
类型 select poll epoll
操作方式 遍历 遍历 回调
底层实现 数组 链表 红黑树
IO效率 每次都进行线性遍历,O(n) 每次都进行线性遍历,O(n) 事件通知方式,当fd就绪,系统注册的回调函数被调用,fd放入readyList中,O(1)
最大连接数 1024(X86)或2048(X64) 无上限 无上限
fd拷贝 每次调用select,都需要把fd集合从用户态拷贝到内核态 每次调用poll,都需要把fd集合从用户态拷贝到内核态 调用epoll_ctl时拷贝进内核并保存,之后epoll_wait不拷贝,epoll通过内核和用户空间共享一块内存来实现的。

对比

image.png

目前操作系统对AIO的支持并没有特别完善

上一篇 下一篇

猜你喜欢

热点阅读