面试

Netty理论一:Java IO与NIO

2018-11-29  本文已影响52人  雪飘千里

1、Linux IO模型

各种I/O模型的比较

image.png

2、 Java BIO(Blocking I/O)

image.png

Demo

package com.demo.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class MultiThreadedEchoServer {
    private int port;

    public MultiThreadedEchoServer(int port) {
        this.port = port;
    }

    public void startServer() {
        ServerSocket echoServer = null;
        Executor executor = Executors.newFixedThreadPool(5);
        int i = 0;
        System.out.println("服务器在端口[" + this.port + "]等待客户请求......");
        try {
            echoServer = new ServerSocket(8080);
            while (true) {
                Socket clientRequest = echoServer.accept();
                executor.execute(new ThreadedServerHandler(clientRequest, i++));
            }
        } catch (IOException e) {
            System.out.println(e);
        }
    }

    public static void main(String[] args) throws IOException {
        new MultiThreadedEchoServer(8080).startServer();

    }
}
package com.demo.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

public class ThreadedServerHandler implements Runnable {
    Socket clientSocket = null;
    int clientNo = 0;

    ThreadedServerHandler(Socket socket, int i) {
        if (socket != null) {
            clientSocket = socket;
            clientNo = i;
            System.out.println("创建线程为[" + clientNo + "]号客户服务...");
        }
    }

    @Override
    public void run() {
        PrintStream os = null;
        BufferedReader in = null;
        try {
            in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            os = new PrintStream(clientSocket.getOutputStream());
            String inputLine;
            while ((inputLine = in.readLine()) != null) {

                // 输入'Quit'退出
                if (inputLine.equals("Quit")) {
                    System.out.println("关闭与客户端[" + clientNo + "]......" + clientNo);
                    os.close();
                    in.close();
                    clientSocket.close();
                    break;
                } else {
                    System.out.println("来自客户端[" + clientNo + "]的输入: [" + inputLine + "]!");
                    os.println("来自服务器端的响应:" + inputLine);
                }
            }
        } catch (IOException e) {
            System.out.println("Stream closed");
        }
    }
}
package com.demo.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class EchoClient {
    public static void main(String[] args) {

        Socket echoSocket = null;
        PrintWriter out = null;
        BufferedReader in = null;

        try {

            echoSocket = new Socket("127.0.0.1", 8080);
            out = new PrintWriter(echoSocket.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(
                    echoSocket.getInputStream()));
            System.out.println("连接到服务器......");
            System.out.println("请输入消息[输入\"Quit\"]退出:");
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(
                    System.in));
            String userInput;

            while ((userInput = stdIn.readLine()) != null) {
                out.println(userInput);
                System.out.println(in.readLine());

                if (userInput.equals("Quit")) {
                    System.out.println("关闭客户端......");
                    out.close();
                    in.close();
                    stdIn.close();
                    echoSocket.close();
                    System.exit(1);
                }
                System.out.println("请输入消息[输入\"Quit\"]退出:");
            }
        } catch (UnknownHostException e) {
            System.err.println("Don't know about host");
            System.exit(1);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for "
                    + "the connection ");
            System.exit(1);
        }
    }
}

3、Java NIO简介

1、变迁

2、 Java IO vs NIO

image.png

4、 Java NIO组件之Buffer

Java NIO Buffer:一个Buffer本质上是内存中的一块,可以将数据写入这块内存,从这块内存中获取数据
java.nio定义了以下几种Buffer的实现

image.png

1、Buffer中有三个主要概念:

image.png image.png

2、Direct ByteBuffer VS. non-direct ByteBuffer

image.png image.png image.png

3、Buffer API:

public final Buffer rewind() {
    position = 0;
    mark = -1;  //取消标记
    return this;
    }
public final Buffer clear(){
      position = 0; //重置当前读写位置
      limit = capacity; 
      mark = -1;  //取消标记
      return this;
}
package com.demo.nio.buffers;

import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;

public class BufferAccess {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        printBuffer(buffer);
        
        buffer.put((byte)'H').put((byte)'e').put((byte)'l').put((byte)'l').put((byte)'o');
        printBuffer(buffer);
        // 翻转缓冲区
        buffer.flip();
        printBuffer(buffer);

        //取buffer
        System.out.println("" + (char) buffer.get() + (char) buffer.get());
        printBuffer(buffer);

        buffer.mark();
        printBuffer(buffer);

        //读取两个元素后,恢复到之前mark的位置处
        System.out.println("" + (char) buffer.get() + (char) buffer.get());
        printBuffer(buffer);

        buffer.reset();
        //buffer.rewind();

        printBuffer(buffer);

        //压缩,将已读取了的数据丢弃,
        // 保留未读取的数据并将保留的数据重新填充到缓冲区的顶部,然后继续向缓冲区写入数据
        buffer.compact();
        printBuffer(buffer);

        buffer.clear();
        printBuffer(buffer);

    }
    
    private static void printBuffer(Buffer buffer) {
        System.out.println("[limit=" + buffer.limit() 
                +", position = " + buffer.position()
                +", capacity = " + buffer.capacity()
                +", array = " + new String((byte[]) buffer.array()) +"]");
    }
}

5、 Java NIO组件之 Channel

所有的 NIO 操作始于通道,通道是数据来源或数据写入的目的地
java.nio 包中主要实现的以下几个 Channel:
FileChannel:文件通道,用于文件的读和写;
DatagramChannel:用于 UDP 连接的接收和发送;
SocketChannel:TCP 连接通道,简单理解就是 TCP 客户端;
ServerSocketChannel:TCP 对应的服务端,用于监听某个端口进来的请求;

image.png image.png image.png

6、 Java NIO组件之 Selector

java.nio.channels.Selector,支持IO多路复用的抽象实体;
用于检查一个或多个NIO Channel的状态是否处于可读、可写;
这样的话,可以实现单线程管理多个channels,也就是可以管理多个网络链接,如下图;

image.png
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

//register的第二个参数,这个参数是一个“关注集合”,代表关注的channel状态,
//有四种基础类型可供监听, 用SelectionKey中的常量表示如下:
//SelectionKey.OP_CONNECT
//SelectionKey.OP_ACCEPT
//SelectionKey.OP_READ
//SelectionKey.OP_WRITE
int select()
int select(long timeout)
int selectNow()

迭代Selected Key集合并处理就绪channel
在调用select并返回了有channel就绪之后,可以通过选中的key集合来获取channel,这个操作通过调用selectedKeys()方法:

Set<SelectionKey> selectedKeys = selector.selectedKeys();  

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
image.png

7、Demo

package com.demo.nio.demo;

import java.io.IOException;

 
public class NIOEchoServer {

 
    public static void main(String[] args) throws IOException {
    int port = 8080;
    if (args != null && args.length > 0) {
        try {
        port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
        // 采用默认值
        }
    }
    EchoHandler timeServer = new EchoHandler(port);
    new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}
package com.demo.nio.demo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class EchoHandler implements Runnable {

    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;
    private int num = 0;

    public EchoHandler(int port) {
        try {
            //创建Selector
            selector = Selector.open();
            //创建ServerSocketChannel并注册到Selector上,关注Accept事件
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器在端口[" + port + "]等待客户请求......");
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                //更新所有就绪的SelectionKey的状态,并返回就绪的channel个数
                if(selector.select(1000)==0)
                                    continue;
                 //迭代Selected Key集合并处理就绪channel
                //事件处理循环:遍历就绪的channel,分别进行处理
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }

    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            // 处理新接入的请求消息,Accept事件
            if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = ssc.accept(); // Non blocking, never null
                socketChannel.configureBlocking(false);
                SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_READ);

                sk.attach(num++);
            }
             //处理read事件
            if (key.isReadable()) {
                // 读取数据
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("来自客户端[" + key.attachment() + "]的输入: [" + body.trim() + "]!");

                    if (body.trim().equals("Quit")) {
                        System.out.println("关闭与客户端[" + key.attachment() + "]......");
                        key.cancel();
                        sc.close();
                    } else {
                        String response = "来自服务器端的响应:" + body;
                        doWrite(sc, response);
                    }

                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {

                }
            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}
package com.demo.nio.demo;

public class NIOEchoClient {

    public static void main(String[] args) {

        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
            }
        }
        new Thread(new NIOEchoClientHandler("127.0.0.1", port), "NIOEchoClient-001").start();
    }
}
package com.demo.nio.demo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOEchoClientHandler implements Runnable {

    private String host;
    private int port;

    private Selector selector;
    private SocketChannel socketChannel;
    
    private ExecutorService executorService;

    private volatile boolean stop;

    public NIOEchoClientHandler(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        this.executorService= Executors.newSingleThreadExecutor();

        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            socketChannel.connect(new InetSocketAddress(host, port));
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        if(executorService != null) {
            executorService.shutdown();
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 判断是否连接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    System.out.println("连接到服务器......");
                    
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    System.out.println("请输入消息[输入\"Quit\"]退出:");

                    executorService.submit(() -> {
                        while(true) {
                            try {
                                buffer.clear();
                                InputStreamReader input = new InputStreamReader(System.in);
                                BufferedReader br = new BufferedReader(input);
                                
                                String msg = br.readLine();
                                
                                if (msg.equals("Quit")) {
                                    System.out.println("关闭客户端......");
                                    key.cancel();
                                    sc.close();
                                    this.stop = true;
                                    break;
                                }
                                
                                buffer.put(msg.getBytes());
                                buffer.flip();
                                
                                sc.write(buffer);
                                
                                System.out.println("请输入消息[输入\"Quit\"]退出:");

                            } catch (Exception ex) {
                                ex.printStackTrace();
                            }
                        }                       
                    });
                    sc.register(selector, SelectionKey.OP_READ);
                } else {
                    System.exit(1); // 连接失败,进程退出
                }
            }
            
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println(body);
                    
                    if(body.equals("Quit"))
                    {
                        this.stop = true;
                    }
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else
                    ; // 读到0字节,忽略
            }
            
            if(key.isWritable()){
                 System.out.println("The key is writable");
            }
        }
    }
 

    private void doWrite(SocketChannel sc) throws IOException {
/*      System.out.println("请输入消息[输入\"Quit\"]退出:");
        BufferedReader stdIn = new BufferedReader(new InputStreamReader(
                System.in));
        String userInput;

        while ((userInput = stdIn.readLine()) != null) {
            out.println(userInput);
            System.out.println(in.readLine());*/
            byte[] req = "QUERY TIME ORDER".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            sc.write(writeBuffer);
            if (!writeBuffer.hasRemaining())
                System.out.println("Send order 2 server succeed.");

/*          
            if (userInput.equals("Quit")) {
                System.out.println("Closing client");
                out.close();
                in.close();
                stdIn.close();
                echoSocket.close();
                System.exit(1);
            }
            System.out.println("请输入消息[输入\"Quit\"]退出:");

        }*/
        
    }
}

8、NIO错误认识

1、使用NIO = 高性能
其实并不一定,在一些场景下,使用NIO并不一定更快,比如

2、NIO完全屏蔽了平台差异
NO,NIO仍然是基于各个OS平台的IO系统实现的,差异仍然存在

3、使用NIO做网络编程很容易

上一篇下一篇

猜你喜欢

热点阅读