NIO入门(Buffer & Channel & Selecto

2019-12-08  本文已影响0人  宫新程

个人学习笔记,源码链接:https://github.com/GongXincheng/gxc-nio-netty

1:Buffer 缓冲区

1.1:Buffer 中的属性

规则:mark <= position <= limit <= capacity

capacity:容量,表示缓冲区中最大存储数据的容量,一旦声明无法更改。

limit:界限,表示缓冲区中可以操作数据的大小(limit后的数据不能读写)。

position:位置,表示缓存区中正在操作数据的位置。

mark:标记,表示当前position的位置,可以通过reset()恢复到 mark 位置

1.2:ByteBuffer中常用方法

/**
* 分配新的字节缓冲区
*/
ByteBuffer allocate(int cap) {
    capacity = cap; 
    limit = cap;
    position = 0;
    mark = -1;
} 
                
/**
* 将写模式转换为读模式
*/
Buffer flip() {
    limit = position; 
    position = 0;
    mark = -1;
}
                
/**
* 可重复读数据
*/
Buffer rewind() {
    position = 0;
    mark = -1;
}

/**
* 清空缓冲区,但是缓冲区中的数据都还在,处于"被遗忘"状态
*/
Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
}
               
/**
* 标记position的位置
*/
Buffer mark() {
    mark = position;
}
                
/**
* position恢复到mark的位置
*/
Buffer reset() {
    position = mark
}

/**
* 缓冲区中时候还有可读数据
*/
boolean hasRemaining() {
    return position < limit
}

/**
* 可读数据的个数
*/
int remaining() {
    return limit - position
}

1.3:直接缓冲区和非直接缓冲区

非直接缓冲区:通过allocate()方法分配缓冲区,将缓冲区建立在JVM的内存中

直接缓冲区:通过allocateDirect()方法分配直接缓冲区,将缓冲区建立在物理内存中

直接缓冲区
非直接缓冲区

2:Channel 通道

2.1:Channel简介

    用于 源节点 与 目标节点 的连接,在 Java NIO 中负责缓冲区中的数据传输。
    Channel 本身不存储数据,因此需要配合 Buffer 进行传输
Channel

2.2:Channel 的主要实现类

java.nio.channels.Channel 接口
    |-- FileChanel
    |-- SelectableChannel
        |-- SocketChanel
        |-- ServerSockerChanel
        |-- DatagramChanel

2.3:获取 Channel 的方式

1:Java 针对支持通道的类提供了 getChannel() 方法
eg: 
    本地IO:
        FileInputStream / FileOutputStream
        RandomAccessFile
    网络IO:
        Socket
        ServerSocket
        DatagramSocket
        
2:在 JDK 1.7 中的 NIO.2 针对各个通道提供了静态方法 open()

3:在 JDK 1.7 中的 NIO.2 的 Files 工具类的 newByteChannel()

2.4:利用通道完成文件复制(非直接缓冲区)

@Test
public void testUseChannelToCopyFile() throws Exception {
    FileInputStream fis = new FileInputStream("/Users/gxc/tmp/sl.mp4");
    FileOutputStream fos = new FileOutputStream("/Users/gxc/tmp/sl2.mp4");

    // 1:获取通道
    FileChannel inChannel = fis.getChannel();
    FileChannel outChannel = fos.getChannel();

    // 2:分配指定大小的缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);

    // 3:将通道中的数据存入到缓冲区中
    while(inChannel.read(buf) != -1) {
        // 将 buffer 的写模式切换成读模式
        buf.flip();
        // 4:将缓冲区中的数据写入到通道中
        outChannel.write(buf);
        // 清除缓冲区
        buf.clear();
    }

    // 5:关闭资源
    outChannel.close();
    inChannel.close();
    fos.close();
    fis.close();
}

2.5:使用直接缓冲区完成文件的复制,直接缓冲区(内存映射文件)

@Test
public void testUseChannelToCopyFile2() throws Exception {
    FileChannel inChannel = FileChannel.open(
            Paths.get("/Users/gxc/tmp/sl.mp4"),
            StandardOpenOption.READ);

    FileChannel outChannel = FileChannel.open(
            Paths.get("/Users/gxc/tmp/sl3.mp4"),
            StandardOpenOption.READ,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE_NEW);

    // 内存映射文件
    MappedByteBuffer inMappedBuf = inChannel.map(
            FileChannel.MapMode.READ_ONLY, 0,
            inChannel.size());

    MappedByteBuffer outMappedBuf = outChannel.map(
            FileChannel.MapMode.READ_WRITE, 0,
            inChannel.size());

    // 直接对缓冲区 进行数据的读写操作
    byte[] dst = new byte[inMappedBuf.limit()];
    inMappedBuf.get(dst);
    outMappedBuf.put(dst);

    inChannel.close();
    outChannel.close();
}

2.6:通道之间的数据传输(直接缓冲区)

@Test
public void testUseChannelToCopyFile3() throws Exception {
    FileChannel inChannel = FileChannel.open(
            Paths.get("/Users/gxc/tmp/sl.mp4"),
            StandardOpenOption.READ);

    FileChannel outChannel = FileChannel.open(
            Paths.get("/Users/gxc/tmp/sl4.mp4"),
            StandardOpenOption.READ,
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);

    //inChannel.transferTo(0, inChannel.size(), outChannel);
    outChannel.transferFrom(inChannel, 0, inChannel.size());

    inChannel.close();
    outChannel.close();
}

2.7:分散(Scatter)与聚集(Gather)

分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中

聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中
@Test
public void testUseChannelToCopyFile4() throws Exception {
    RandomAccessFile raf = new RandomAccessFile("1.txt", "rw");

    // 1:获取通道
    FileChannel channel = raf.getChannel();

    // 2:分配指定大小的缓冲区
    ByteBuffer buf1 = ByteBuffer.allocate(100);
    ByteBuffer buf2 = ByteBuffer.allocate(1024);

    // 3:分散读取
    ByteBuffer[] bufs = {buf1, buf2};
    channel.read(bufs);

    for (ByteBuffer buf : bufs) {
        buf.flip();
    }
    System.out.println(
    new String(bufs[0].array(), 0, bufs[0].limit()));
    System.out.println("----------------------");
    System.out.println(
    new String(bufs[1].array(), 0, bufs[1].limit()));

    // 4:聚集写入
    RandomAccessFile raf2 = new RandomAccessFile("1-1.txt", "rw");
    FileChannel channel2 = raf2.getChannel();

    channel2.write(bufs);
}

2.8:字符集 Charset

编码:字符串 -> 字节数组

解码:字节数组 -> 字符串
@Test
public void test() throws CharacterCodingException {
    Charset cs1 = StandardCharsets.UTF_8;

    // 获取编码器和解码器
    CharsetEncoder ce = cs1.newEncoder();
    CharsetDecoder cd = cs1.newDecoder();

    CharBuffer cb = CharBuffer.allocate(1024);
    cb.put("GongXincheng测试");
    cb.flip();

    // 编码
    ByteBuffer byteBuffer = ce.encode(cb);
    for (int i = 0; i < byteBuffer.limit(); i++) {
        System.out.println(byteBuffer.get());
    }

    byteBuffer.flip();

    // 解码
    System.out.println("------------");
    CharBuffer charBuffer = cd.decode(byteBuffer);
    System.out.println(charBuffer.toString());
}

3:Selector 选择器

是 SelectableChannel 的多路复用器。用于监控Selectable的 IO 状况

java.nio.channels.Channel 接口
    |-- FileChanel
    |-- SelectableChannel
        |-- SocketChanel        
        |-- ServerSockerChanel
        |-- DatagramChanel

        |-- Pipe.SinkChannel
        |-- Pipe.SourceChannel

3.1:阻塞式NIO网络通讯(传输图片)

/**
* 客户端
*   1:先创建Socket通道,并设置ip和端口号
*   2:分配指定大小的缓冲区
*   3:创建本地文件的通道,循环读取数据到缓冲区中
*   4:将缓冲区的数据写到Socket通道中去
*   5:关闭资源
*/
@Test
public void client() throws Exception {
    // 1:获取通道
    SocketChannel socketChannel = SocketChannel.open(
            new InetSocketAddress("127.0.0.1", 10000));

    // 2:分配指定大小的缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    // 3:读取本地文件, 并发送到服务端
    FileChannel inChannel = FileChannel.open(
        Paths.get("1.png"), StandardOpenOption.READ);
    while (inChannel.read(buffer) != -1) {
        // 切换成读数据模式
        buffer.flip();
        socketChannel.write(buffer);
        buffer.clear();
    }

    // 关闭通道
    inChannel.close();
    socketChannel.close();
}

/**
 * 服务端
 *  1:获取服务端Socket通道,并绑定ip
 *  2:获取客户端连接的通道
 *  3:创建文件通道,并将文件保存到本地
 *  4:关闭资源
 */
@Test
public void server() throws Exception {
    // 1:获取服务端Socket通道
    ServerSocketChannel ssChannel = ServerSocketChannel.open();
    // 2:服务端绑定端口号
    ssChannel.bind(new InetSocketAddress(10000));

    // 3:获取客户端连接的通道
    SocketChannel socketChannel = ssChannel.accept();

    // 4:接收客户端的数据,并保存到本地
    FileChannel fileChannel = FileChannel.open(
            Paths.get("2.png"),
            StandardOpenOption.WRITE,
            StandardOpenOption.CREATE);
            
    // 分配指定大小的缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    while (socketChannel.read(buffer) != -1) {
        buffer.flip();
        fileChannel.write(buffer);
        buffer.clear();
    }

    fileChannel.close();
    socketChannel.close();
    ssChannel.close();
}

3.2:阻塞式NIO网络通讯(服务端相应客户端内容)

/**
 * 客户端
 */
@Test
public void client() throws Exception {
    SocketChannel socketChannel = SocketChannel.open(
            new InetSocketAddress("127.0.0.1", 10000));
    FileChannel fileChannel = FileChannel.open(
            Paths.get("1.png"), StandardOpenOption.READ);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    while (fileChannel.read(buffer) != -1) {
        buffer.flip();
        socketChannel.write(buffer);
        buffer.clear();
    }

    // !告诉服务端,客户端已经发完数据,否则服务端会一直处于阻塞状态!
    socketChannel.shutdownOutput();

    // 接收服务端反馈
    while (socketChannel.read(buffer) != -1) {
        buffer.flip();
        System.out.println(
            new String(buffer.array(), 0, buffer.limit()));
        buffer.clear();
    }

    fileChannel.close();
    socketChannel.close();
}
/**
 * 服务端
 */
@Test
public void server() throws IOException {
    ServerSocketChannel serverSocketChannel =
        ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(10000));

    SocketChannel socketChannel = serverSocketChannel.accept();
    FileChannel fileChannel = FileChannel.open(
        Paths.get("3.png"), 
        StandardOpenOption.WRITE, 
        StandardOpenOption.CREATE);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    while (socketChannel.read(buffer) != -1) {
        buffer.flip();
        fileChannel.write(buffer);
        buffer.clear();
    }

    // 发送反馈给客户端
    buffer.put("服务端接收客户端数据成功".getBytes());
    buffer.flip();
    socketChannel.write(buffer);

    fileChannel.close();
    socketChannel.close();
    serverSocketChannel.close();
}

3.3:非阻塞式NIO网络通讯

/**
* 客户端
*/
@Test
public void client() throws Exception {
    // 1:获取Socket通道
    SocketChannel sChannel = SocketChannel.open(
            new InetSocketAddress("127.0.0.1", 10000));

    // 2:切换成非阻塞模式
    sChannel.configureBlocking(false);

    // 3:分配指定大小的缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    // 4:发送数据给服务端
    buffer.put((LocalDateTime.now().toString() 
        + "\n" + "Hello World").getBytes());
    buffer.flip();
    sChannel.write(buffer);
    buffer.clear();
    
    // 5:关闭通道
    sChannel.close();
}
/**
 * 服务端
 */
@Test
public void server() throws Exception {
    // 1:获取通道
    ServerSocketChannel serverSocketChannel = 
        ServerSocketChannel.open();
    // 2:切换到非阻塞模式
    serverSocketChannel.configureBlocking(false);
    // 3:绑定连接
    serverSocketChannel.bind(new InetSocketAddress(10000));
    // 4:获取选择器
    Selector selector = Selector.open();
    // 5:将通道注册到选择器上,并且指定"监听接收事件"
    serverSocketChannel.register(selector,
        SelectionKey.OP_ACCEPT);
    // 6:轮询式的获取选择器上已经"准备就绪"的事件
    while (selector.select() > 0) {
        // 7:获取当前selector中所有注册的"选择键(已就绪的监听事件)"
        Iterator<SelectionKey> iterator =
            selector.selectedKeys().iterator();
        while (iterator.hasNext()) {
            // 8:获取准备"就绪"的事件
            SelectionKey selectionKey = iterator.next();
            // 9:判断具体是哪种事件类型
            if(selectionKey.isAcceptable()) {
                // 10:若接收事件"就绪",获取客户端连接
                SocketChannel sChannel =
                    serverSocketChannel.accept();
                // 11:将客户端通道切换成非阻塞模式
                sChannel.configureBlocking(false);
                // 12:将该 客户端通道 注册到Selector上,监听"读就绪"状态
                sChannel.register(selector, 
                    SelectionKey.OP_READ);
            } else if (selectionKey.isReadable()) {
                // 13:获取当前Selector上"读就绪"的通道
                SocketChannel socketChannel = 
                    (SocketChannel) selectionKey.channel();
                // 14:读取数据
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                while (socketChannel.read(buffer) != -1) {
                    buffer.flip();
                    System.out.println(
                        new String(buffer.array(), 0,
                            buffer.limit()));
                    buffer.clear();
                }
            }
            // 15:取消选择键 SelectionKey
            iterator.remove();
        }
    }
}
非阻塞服务端代码

3.4:非阻塞式NIO网络通讯(UDP)

/**
 * 发送端.
 */
@Test
public void send() throws Exception {
    DatagramChannel datagramChannel = DatagramChannel.open();
    datagramChannel.configureBlocking(false);

    ByteBuffer buffer = ByteBuffer.allocate(1024);
    Scanner sc = new Scanner(System.in);
    while (sc.hasNext()) {
        String message = sc.next();
        buffer.put(message.getBytes());

        buffer.flip();
        datagramChannel.send(buffer,
                new InetSocketAddress("127.0.0.1", 10001));
        buffer.clear();
    }

    sc.close();
    datagramChannel.close();
}
/**
 * 接收端
 */
@Test
public void receive() throws Exception {
    DatagramChannel datagramChannel = DatagramChannel.open();
    datagramChannel.configureBlocking(false);
    datagramChannel.bind(new InetSocketAddress(10001));
    
    Selector selector = Selector.open();
    datagramChannel.register(selector, SelectionKey.OP_READ);

    while (selector.select() > 0) {
        Set<SelectionKey> skSet = selector.selectedKeys();
        Iterator<SelectionKey> iterator = skSet.iterator();
        while (iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            if (sk.isReadable()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                datagramChannel.receive(buffer);
                buffer.flip();
                System.out.println(new String(
                        buffer.array(), 0, buffer.limit()));
                buffer.clear();
            }
            iterator.remove();
        }
    }
}

3.5:Pipe管道

@Test
public void testPipe() throws Exception {
    // 1:获取管道
    Pipe pipe = Pipe.open();

    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.put("通过单向管道发送数据".getBytes());
    buffer.flip();

    // 2:将缓冲区的数据接入管道
    Pipe.SinkChannel sinkChannel = pipe.sink();
    sinkChannel.write(buffer);

    // 3:读取缓冲区中的数据
    buffer.flip();
    Pipe.SourceChannel sourceChannel = pipe.source();
    sourceChannel.read(buffer);
    System.out.println(
        new String(buffer.array(), 0, buffer.limit()));
}
上一篇下一篇

猜你喜欢

热点阅读