I/O-NIO-多路复用器

2020-11-30  本文已影响0人  麦大大吃不胖

by shihang.mai

1. 概述

通过一个系统调用,获取多个IO状态,叫多路复用器。在Linux下多路复用器都是同步模型。(只要程序自己读写IO,那么IO模型就是同步的

多路复用器
只关注IO:不关注从IO读写完之后的事情

同步:app自己R/W
异步:kernel完成R/W 只有win:iocp
阻塞:blocking
非阻塞:non-blocking

linux以及成熟的框架netty:同步非阻塞 同步阻塞

2. select poll

select poll

系统调用

  1. 调用socket()返回一个fd
  2. 调用bind(fd,9090),将fd和端口绑定起来
  3. 调用listen(),监听端口
  4. 调用select(fds)查找有状态的IO
  5. 调用recv(有状态的fd)

其实无论NIO SELECT POLL都需要遍历所有的IO询问状态,只不过:

2.1 弊端

  1. 每次重复传递fds
  2. 每次内核被调用之后,针对这次调用,触发一次遍历fds全量的复杂度

3. epoll

3.1 概述

epoll

现象调用过程

  1. 首先,执行无论BIO NOI SELECT poll都有的socket->bind->listen,如listen得到fd4->socket连接
  2. 调用epoll_create,创建fd6(叫epfd)->红黑树
  3. 调用epoll_ctl(fd6,add,fd4,accept),把fd4加入到fd6红黑树中
  4. 调用epoll_wait等待一个链表,这时链表没数据
  5. 当客户端通过网卡发送消息时,会在fd4的buffer中写数据,并且做一个延伸处理(原来的中断中加入延伸逻辑),将fd4在fd6中查找,并改变状态复制到链表中,这时链表含有fd4。
  6. 这时调用epoll_wait的话直接得到有状态的IO。

系统调用过程

  1. 调用socket()返回一个fd,例如fd1
  2. 调用bind(fd1,9090),将fd和端口绑定起来
  3. 调用listen(),监听端口
  4. 调用epoll_create(),产生一个红黑树,当然也会产生一个fd代表这个红黑树,例如fd2
  5. 调用epoll_ctl(fd2,add,fd1,accept),意思是在红黑树添加fd1,并且关注的是accept事件
  6. 调用epoll_wait(),它等待的是一个链表
  7. 当有一个连接过来,即是fd1的accept事件,就会将fd1转移到链表中,epoll_wait拿到了后accept产生一个fd3,调用epoll_ctl(fd2,add,fd3,recv)重新加入到红黑树
  8. 当这个链接发消息过来,就会将这个fd3移动到链表里
  9. 那么epoll_wait拿到的都是有状态的IO

优势:

  1. epoll直接调用epoll_ctl增量加入新的fd,解决重复传入fds问题,
  2. 在内核中做了将有状态的IO直接copy到链表,调用epoll_wait直接拿到有状态的IO,解决了遍历fds问题

3.2 举例

public class SocketMultiplexingSingleThreadv1 {

    private ServerSocketChannel server = null;
    private Selector selector = null;
    int port = 9090;

    public void initServer() {
        try {
            //这个server相当于listen状态的fd4
            server = ServerSocketChannel.open();
            //设置non-blocking
            server.configureBlocking(false);
            //绑定端口
            server.bind(new InetSocketAddress(port));
            /*
             * 创建多路复用器 优先使用epoll
             * select poll:创建一个数组用来存放fds
             * epoll:调用epoll_create==>fd3
             */
            selector = Selector.open();
            /*
             * select poll:把fd4放入数组
             * epoll:调用epoll_ctl(fd3,add,fd4,epoll_in) 将fd4放入 上面epoll_create得到的内存空间fd3(实际运行时在selector.select时才放进去,懒加载),注册accept事件
             */
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        initServer();
        System.out.println("服务器启动了。。。。。");
        try {
            while (true) {
                /*
                 * select poll:查看数组中有的fds
                 * epoll:查看fd3 红黑树中的fds
                 */
                Set<SelectionKey> keys = selector.keys();
                System.out.println(keys.size()+"   size");
                /*
                 * select poll:传入fds,查找有状态的IO
                 * epoll:epoll_wait
                 */
                while (selector.select(500) > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selectionKeys.iterator();
                    //无论基于那种多路复用器,得到有状态的IO后,都要自行R/W
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        //是否需要accept。服务端未accept前,客户端建立连接并发消息,就会进到这里
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                            //是否可读。即是否已经分配进程处理socket连接
                        } else if (key.isReadable()) {
                            readHandler(key);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            //accept得到client
            SocketChannel client = ssc.accept();
            //设置client non-blocking 为了读不阻塞
            client.configureBlocking(false);
            //创建缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            /*
             *注册当前fd对应的read buffer
             */
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客户端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read = 0;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
        service.start();
    }
}

这个红黑树观察的文件描述符是有限的,属性max_user_watch

4. epoll单线程/多线程

场景:服务端接收到客户端数据,写回给客户端

单线程_多线程epoll

4.1 单线程

  1. 主线程通过selector查找有状态的IO,然后调用readHandler(),readHandler读取后直接write

  2. 主线程通过selector查找有状态的IO,然后调用readHandler(),readHandler里将当前fd.register selector OP_WRITE,那么再查找有状态的IO时,就会触发writeHandler(),再去write

4.2 多线程

  1. 主线程通过selector查找有状态的IO,然后new thread()去执行readHandler(),readHandler里将当前fd.register selector OP_WRITE,但这个操作需要时间,当抛出线程,立刻返回,再一次通过selector查找有状态的IO,仍然会有当前这个fd。所有会重复调起readHandler()
  2. 当1中注册了写事件,而写事件是看send-q有没空位,所以当主线程再次通过selector查找有状态的IO,会重复new thread()调起writeHandler

解决重复调用办法:在调用readHandler()和writeHandler()前加入key.cancle(epoll_ctl(del))

4.2.1 多线程弊端:

  1. 考虑资源利用,充分利用cpu核数。

  2. 考虑有一个fd执行耗时长,在一个线程里会阻塞后续的fd的处理

    但是会重复register和cancle系统调用。

4.3 解决

上一篇下一篇

猜你喜欢

热点阅读