BIO、NIO、AIO笔记

2020-03-26  本文已影响0人  EmonH

同步:任务一的完成需要依赖任务二,只有等待任务二完成,任务一才算完成。
异步:任务一会通知任务二完成什么任务,但是两个任务是互不等待,都会进行。任务二完成之后会告诉任务一。
阻塞:CPU停下来等待一个慢的操作完成才继续后面的工作。
非阻塞:CPU遇到这个慢的操作会先去执行其他的命令,等慢的动作完成之后在处理慢操作对应的命令。
接下来我们说说同步阻塞,同步非阻塞和异步非阻塞
之前看过一位大牛的博客,他举了个例子来解释三个概念,我觉得收益匪浅。小时候妈妈让去烧水,然后自己拿着水壶去了,在烧水的过程中一直等水烧开,这个就是同步阻塞。后来发现烧水需要很长时间,便在烧水的过程中去干别的事,时不时的来看看水是不是烧开了,这个模型就是同步非阻塞。再后来水壶有了烧开水之后发声的功能,那么烧水的时候,我可以不用时不时的去查看,只要听到声音了就知道水烧开了,这个模型就是异步非阻塞。接下来我们用代码看看下三种模型的具体实现。

BIO:同步阻塞

数据的读取写入必须阻塞在一个线程内等待其完成,在java中这样的模型简单容易理解,每次来一个请求,服务器都会开启一个线程去处理,当在连接数小于1000时,可以让每一个连接专注于自己的 I/O,不用过多考虑系统的过载、限流等问题。在搭配线程池的使用,可以很好的解决服务端连接异常的问题。但是当连接数达到万级别之后,线程之间切换带来请求处理慢的问题逐渐体现。
服务端

public class BioServer {
    final static int PROT = 7788;
    public static void main(String[] args) throws IOException {
        ServerSocket server = null;
        server = new ServerSocket(PROT);
        System.out.println(" server start .. ");
        while(true) {
            //进行阻塞
            socket = server.accept();
            //新建一个线程执行客户端的任务
            HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
            executorPool.execute(new ServerHandler(socket));
        }
    }
}
class HandlerExecutorPool {
    private ExecutorService executor;
    public HandlerExecutorPool(int maxPoolSize, int queueSize){
        this.executor = new ThreadPoolExecutor(
                Runtime.getRuntime().availableProcessors(),
                maxPoolSize,
                120L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize));
    }

    public void execute(Runnable task){
        this.executor.execute(task);
    }
}
class ServerHandler implements Runnable {
    private Socket socket;

    public ServerHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null) break;
                System.out.println("Server :" + body);
                out.println("服务器端回送响的应数据.");
            }
        }catch (Exception e){

        }
    }
}

客户端

public class Client implements Runnable {
    final static String ADDRESS = "127.0.0.1";
    final static int PORT = 8088;
    public static void main(String[] args) throws IOException {
        new Thread(new Client()).start();
    }

    @Override
    public void run() {
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(ADDRESS, PORT);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            //向服务器端发送数据
            while (true) {
                out.println("接收到客户端的请求数据...");
                out.println("接收到客户端的请求数据1111...");
                String response = in.readLine();
                System.out.println("Client: " + response);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

NIO:同步非阻塞

jdk1.7以后引入了NIO的变成模式。首先有三个概念需要了解。

buffer缓存区:NIO是将所有数据都用到缓冲区数组中,

InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7788);//创建连接的地址
        SocketChannel sc = null;//声明连接通道
        ByteBuffer buf = ByteBuffer.allocate(1024);//建立缓冲区
            sc = SocketChannel.open();//打开通道
            sc.connect(address);//进行连接
            while(true){
                //定义一个字节数组,然后使用系统录入功能:
                byte[] bytes = new byte[1024];
                System.in.read(bytes);
                buf.put(bytes);//把数据放到缓冲区中
                buf.flip();//对缓冲区进行复位
                sc.write(buf);//写出数据
                buf.clear();//清空缓冲区数据
            }
            ···

Channel 通道

NIO支持网络数据从Channel中读取,Channel是区别与传统的输入输出流的,传统输入输出流只支持单向数据流动,而Channel同时支持读取和写入,有多种状态位可以被识别。

Selector 多路复用选择器

NIO模型中一个连接就是一个Channel,所有的Channel都注册在Selector 中,Selector多路复用器选择器轮询查看Channel的状态位,当Channel发生读写操作时。便处于就绪状态,selector多路选择复用器会将所有处于就绪状态的Channel轮询出来,以继续后面的io操作,一个Selector可以负责上万级别的Channel,没有上限,这也是JDK使用epoll代替了传统的selector实现。
服务端代码

public class NioServer implements Runnable {
    private Selector seletor;
    //2 建立缓冲区
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    //3
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);

    public NioServer(int port) {
        try {
            //1 打开多路复用器
            this.seletor = Selector.open();
            //2 打开服务器通道
            ServerSocketChannel ssc = ServerSocketChannel.open();
            //3 设置服务器通道为非阻塞模式
            ssc.configureBlocking(false);
            //4 绑定地址
            ssc.bind(new InetSocketAddress(port));
            //5 把服务器通道注册到多路复用器上,并且监听阻塞事件
            ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
            System.out.println("Server start, port :" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                //1 必须要让多路复用器开始监听
                this.seletor.select();
                //2 返回多路复用器已经选择的结果集
                Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                //3 进行遍历
                while (keys.hasNext()) {
                    //4 获取一个选择的元素
                    SelectionKey key = keys.next();
                    //5 直接从容器中移除就可以了
                    keys.remove();
                    //6 如果是有效的
                    if (key.isValid()) {
                        //7 如果为阻塞状态
                        if (key.isAcceptable()) {
                            this.accept(key);
                        }
                        //8 如果为可读状态
                        if (key.isReadable()) {
                            this.read(key);
                        }
                        //9 写数据
                        if (key.isWritable()) {
                            this.write(key); //ssc
                        }
                    }

                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void write(SelectionKey key) throws ClosedChannelException {
        ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
        ssc.register(this.seletor, SelectionKey.OP_WRITE);
    }

    private void read(SelectionKey key) {
        try {
            //1 清空缓冲区旧的数据
            this.readBuf.clear();
            //2 获取之前注册的socket通道对象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 读取数据
            int count = sc.read(this.readBuf);
            //4 如果没有数据
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
            this.readBuf.flip();
            //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
            byte[] bytes = new byte[this.readBuf.remaining()];
            //7 接收缓冲区数据
            this.readBuf.get(bytes);
            //8 打印结果
            String body = new String(bytes).trim();
            System.out.println("Server : " + body);

            // 9..可以写回给客户端数据

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private void accept(SelectionKey key) {
        try {
            //1 获取服务通道
            ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
            //2 执行阻塞方法
            SocketChannel sc = ssc.accept();
            //3 设置阻塞模式
            sc.configureBlocking(false);
            //4 注册到多路复用器上,并设置读取标识
            sc.register(this.seletor, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        new Thread(new NioServer(8088)).start();;
    }
}

客户端

class NioClient{
    private static String DEFAULT_HOST = "127.0.0.1";
    private static int DEFAULT_PORT = 8088;
    private static ClientHandle clientHandle;
    public static void start(){
        start(DEFAULT_HOST,DEFAULT_PORT);
    }
    public static synchronized void start(String ip,int port){
        if(clientHandle!=null)
            clientHandle.stop();
        clientHandle = new ClientHandle(ip,port);
        new Thread(clientHandle,"Server").start();
    }
    //向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception{
        if(msg.equals("q")) return false;
        clientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args){
        start();
    }
}
class ClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean started;

    public ClientHandle(String ip,int port) {
        this.host = ip;
        this.port = port;
        try{
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            socketChannel = SocketChannel.open();
            //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
            socketChannel.configureBlocking(false);//开启非阻塞模式
            started = true;
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void stop(){
        started = false;
    }
    @Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
        //循环遍历selector
        while(started){
            try{
                //无论是否有读写事件发生,selector每隔1s被唤醒一次
                selector.select(1000);
                //阻塞,只有当至少一个注册的事件发生的时候才会继续.
//              selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector关闭后会自动释放里面管理的资源
        if(selector != null)
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
    }
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                if(sc.finishConnect());
                else System.exit(1);
            }
            //读消息
            if(key.isReadable()){
                //创建ByteBuffer,并开辟一个1M的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                    buffer.flip();
                    //根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客户端收到消息:" + result);
                }
                //没有读取到字节 忽略
//              else if(readBytes==0);
                //链路已经关闭,释放资源
                else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }
        }
    }
    //异步发送消息
    private void doWrite(SocketChannel channel,String request) throws IOException{
        //将消息编码为字节数组
        byte[] bytes = request.getBytes();
        //根据数组容量创建ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //将字节数组复制到缓冲区
        writeBuffer.put(bytes);
        //flip操作
        writeBuffer.flip();
        //发送缓冲区的字节数组
        channel.write(writeBuffer);
        //****此处不含处理“写半包”的代码
    }
    private void doConnect() throws IOException{
        if(socketChannel.connect(new InetSocketAddress(host,port)));
        else socketChannel.register(selector, SelectionKey.OP_CONNECT);
    }
    public void sendMsg(String msg) throws Exception{
        socketChannel.register(selector, SelectionKey.OP_READ);
        doWrite(socketChannel, msg);
    }
}

AIO:异步非阻塞

AIO基于事件和回调机制,不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。
服务端

public class AioServer {
    public static void main(String[] args) {
        // AIO线程复用版
        Thread sThread = new Thread(new Runnable() {
            @Override
            public void run() {
                AsynchronousChannelGroup group = null;
                try {
                    group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
                    AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(InetAddress.getLocalHost(), 8088));
                    server.accept(null, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() {
                        @Override
                        public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
                            server.accept(null, this); // 接收下一个请求
                            try {
                                Future<Integer> f = result.write(Charset.defaultCharset().encode("你好,世界"));
                                f.get();
                                System.out.println("服务端发送时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                                result.close();
                            } catch (InterruptedException | ExecutionException | IOException e) {
                                e.printStackTrace();
                            }
                        }

                        @Override
                        public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
                        }
                    });
                    group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        sThread.start();
    }
}

客户端

class AioClient {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        // Socket 客户端
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
        Future<Void> future = client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8088));
        future.get();
        ByteBuffer buffer = ByteBuffer.allocate(100);
        client.read(buffer, null, new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer result, Void attachment) {
                System.out.println("客户端打印:" + new String(buffer.array()));
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread.sleep(10 * 1000);
    }
}
上一篇下一篇

猜你喜欢

热点阅读