linux的底层支持以及几种io对比

2019-05-12  本文已影响0人  无聊之园

1. nio的底层系统的支持

liunx的io模型:(只是大概一个感性的认识,liunx的网络原理不深究。)

堵塞io:数据准备好,并且复制到应用内存,才返回。期间一直堵塞。
非堵塞io:io命令并不堵塞,数据没准备好,则返回一个标识错误,轮询检查数据是否准备好并复制到应用内存。
i/o复用模型:一个select或poll轮询检查所有的socket io是否准备好数据,用一个select处理了所有socket io,liunx的epoll性能更高,一个select检查的socket io没有限制,基于事件驱动,而不是轮询扫描,数据准备好则回调方法。
信号驱动io模型:通过发送信号的方式,socket io发送信号,数据准备好发送信号回调。
异步io:当数据准备并且复制到应用内存之后,内核发送通知给应用可以进行io操作了。

nio使用的就是io多路复用原理
linux针对多路复用采用的select、poll等都存在缺陷,一个select轮询的socket数量有限制,效率低。之后采用epoll,epoll的优点有:轮询的socket数量没有限制,效率高:基于socket的callback回调,所以不会把性能浪费在非活跃socket上。epoll和内核mmap共享同一块内存区域,减少了一次复制过程。

nio的缺点:针对文件系统的处理方法能力有点不足。

2.bio

bio的缺点:

  1. 线程个数和客户连接数是1对1关系。(可以用线程池解决)
  2. read和write操作都是堵塞的,数据没准备好,则线程一直堵塞,浪费资源。

3. nio

new io或者no block io。
nio基于的主要的对象是:Buffer、Channel、Selector
Buffer:buffer可以开辟堆内存,堆内存就是普通的java堆,堆外内存则是直接在java堆之外,开辟和回收代价大,但是进行比如socket数据交换的时候少了一次java堆到操作系统的内存复制。
Channel:channel是一个双向操作的通道。
Select:多路复用select,select轮询注册到select上的channel,如果channel有状态变更,用户再去处理这个channel。
关键代码:

服务端:

public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
                        // select轮询捕捉servChannel的accept状态
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

public void run() {
        while (!stop) {
            try {
// select轮询捕捉状态变更,如果没有channel状态发生变更,则一直堵塞,直到传入的1000毫秒超时了,返回捕捉数0。
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
  // 把这个selectKey移除,不移除selectedKeys中永远存在这个selectkey
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }

    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            // 处理新接入的请求消息
            if (key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // socketchannel也注册到select中去,select捕捉其read状态变更
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : "
                            + body);
                    String currentTime = "QUERY TIME ORDER"
                            .equalsIgnoreCase(body) ? new java.util.Date(
                            System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else
                    ; // 读到0字节,忽略
            }
        }
    }

    private void doWrite(SocketChannel channel, String response)
            throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }

客户端

public class TimeClientHandle implements Runnable {

    private String host;
    private int port;

    private Selector selector;
    private SocketChannel socketChannel;

    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

    }

    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            // 判断是否连接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else
                    System.exit(1);// 连接失败,进程退出
            }
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is : " + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else
                    ; // 读到0字节,忽略
            }
        }

    }

    private void doConnect() throws IOException {
        // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    }

    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining())
            System.out.println("Send order 2 server succeed.");
    }

}

nio总结:代码其实和啰嗦繁琐,而且很容易出问题,比如例子中的nio会有粘包拆包现象,所以nio一般不直接使用。

4.Aio

nio 2引入的概念。真正的异步非堵塞io,基于事件驱动的,而不是nio一样轮询select。

服务端

public class AsyncTimeServerHandler implements Runnable {

    private int port;

    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    public AsyncTimeServerHandler(int port) {
    this.port = port;
    try {
            // 这是nio包下的类
        asynchronousServerSocketChannel = AsynchronousServerSocketChannel
            .open();
        asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
        System.out.println("The time server is start in port : " + port);
    } catch (IOException e) {
        e.printStackTrace();
    }
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        // countDownlatch只是为了不让线程结束
    latch = new CountDownLatch(1);
    doAccept();
    try {
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    }

    public void doAccept() {
        // 传入一个accept的处理类
    asynchronousServerSocketChannel.accept(this,
        new AcceptCompletionHandler());
    }

}
public class AcceptCompletionHandler implements
    CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
  // 数据准备好并且复制到应用内存之后,回调这个方法
    @Override
    public void completed(AsynchronousSocketChannel result,
        AsyncTimeServerHandler attachment) {
    attachment.asynchronousServerSocketChannel.accept(attachment, this);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    result.read(buffer, buffer, new ReadCompletionHandler(result));
    }

    @Override
    public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
    exc.printStackTrace();
    attachment.latch.countDown();
    }

}

客户端

public class AsyncTimeClientHandler implements
    CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch latch;

    public AsyncTimeClientHandler(String host, int port) {
    this.host = host;
    this.port = port;
    try {
        client = AsynchronousSocketChannel.open();
    } catch (IOException e) {
        e.printStackTrace();
    }
    }

    @Override
    public void run() {

    latch = new CountDownLatch(1);
    client.connect(new InetSocketAddress(host, port), this, this);
    try {
        latch.await();
    } catch (InterruptedException e1) {
        e1.printStackTrace();
    }
    try {
        client.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    }

    @Override
    public void completed(Void result, AsyncTimeClientHandler attachment) {
    byte[] req = "QUERY TIME ORDER".getBytes();
    ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
    writeBuffer.put(req);
    writeBuffer.flip();
    client.write(writeBuffer, writeBuffer,
        new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
            if (buffer.hasRemaining()) {
                client.write(buffer, buffer, this);
            } else {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                client.read(
                    readBuffer,
                    readBuffer,
                    new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result,
                        ByteBuffer buffer) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer
                            .remaining()];
                        buffer.get(bytes);
                        String body;
                        try {
                        body = new String(bytes,
                            "UTF-8");
                        System.out.println("Now is : "
                            + body);
                        latch.countDown();
                        } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        }
                    }

                    @Override
                    public void failed(Throwable exc,
                        ByteBuffer attachment) {
                        try {
                        client.close();
                        latch.countDown();
                        } catch (IOException e) {
                        // ingnore on close
                        }
                    }
                    });
            }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                client.close();
                latch.countDown();
            } catch (IOException e) {
                // ingnore on close
            }
            }
        });
    }

    @Override
    public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
    exc.printStackTrace();
    try {
        client.close();
        latch.countDown();
    } catch (IOException e) {
        e.printStackTrace();
    }
    }

}

那么为什么netty不使用aio而使用nio呢?netty作者说,在unix系统上aio不会更快比nio,netty已经有一个稳定的nio的封装了。

Not faster than NIO (epoll) on unix systems (which is true)
There is no daragram suppport
Unnecessary threading model (too much abstraction without usage)

几种io对比:

image.png
上一篇下一篇

猜你喜欢

热点阅读