Java NIO——Scatter/Gather

2021-01-29  本文已影响0人  小波同学

前言

java NIO开始支持scatter/gather,scatter/gather用于描述从通道 Channel 中读取或者写入到Channel的操作。

分散(scatter):从Channel中读取是指在读操作时将读取的数据写入多个buffer中。因此,Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中。

聚集(gather):写入Channel是指在写操作时将多个buffer的数据写入同一个Channel,因此,Channel 将多个Buffer中的数据“聚集(gather)”后发送到Channel。

scatter / gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

Scattering Reads

Scattering Reads是指数据从一个channel读取到多个buffer中。如下图描述:

代码示例如下:

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
 
ByteBuffer[] bufferArray = {header, body};
 
channel.read(bufferArray);

注意buffer首先被插入到数组,然后再将数组作为channel.read() 的输入参数。read()方法按照buffer在数组中的顺序将从channel中读取的数据写入到buffer,当一个buffer被写满后,channel紧接着向另一个buffer中写。

Scattering Reads在移动下一个buffer前,必须填满当前的buffer,这也意味着它不适用于动态消息(消息大小不固定)。换句话说,如果存在消息头和消息体,消息头必须完成填充(例如 128byte),Scattering Reads才能正常工作。

Gathering Writes

Gathering Writes是指数据从多个buffer写入到同一个channel。如下图描述:

代码示例如下:

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
 
//write data into buffers
 
ByteBuffer[] bufferArray = {header, body};
 
channel.write(bufferArray);

buffers数组是write()方法的入参,write()方法会按照buffer在数组中的顺序,将数据写入到channel,注意只有position和limit之间的数据才会被写入。因此,如果一个buffer的容量为128byte,但是仅仅包含58byte的数据,那么这58byte的数据将被写入到channel中。因此与Scattering Reads相反,Gathering Writes能较好的处理动态消息。

完整代码示例如下:

/**
 *
 * Scattering:将数据写入到buffer时,可以采用buffer数组,依次写入  [分散]
 * Gathering: 从buffer读取数据时,可以采用buffer数组,依次读
 */
public class ScatteringAndGatheringTest {

    public static void main(String[] args) throws IOException {
        //使用 ServerSocketChannel 和 SocketChannel 网络
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(8000);

        //绑定端口到socket ,并启动
        serverSocketChannel.socket().bind(inetSocketAddress);

        //创建buffer数组
        ByteBuffer[] byteBuffers = new ByteBuffer[2];
        byteBuffers[0] = ByteBuffer.allocate(5);
        byteBuffers[1] = ByteBuffer.allocate(3);

        //等客户端连接(telnet)
        SocketChannel socketChannel = serverSocketChannel.accept();
        int messageLength = 8;   //假定从客户端接收8个字节

        //循环的读取
        while(true){
            int byteRead = 0;
            while (byteRead < messageLength ) {
                long read = socketChannel.read(byteBuffers);
                byteRead += read; //累计读取的字节数
                System.out.println("byteRead=" + byteRead);
                //看看当前的这个buffer的position 和 limit
                Arrays.asList(byteBuffers).stream().map(buffer -> "postion=" + buffer.position() + ", limit=" + buffer.limit()).forEach(System.out::println);

                //将所有的buffer进行flip
                Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip());

                //将数据读出显示到客户端
                long byteWirte = 0;
                while (byteWirte < messageLength) {
                    long write = socketChannel.write(byteBuffers);
                    byteWirte += write;
                }

                //将所有的buffer 进行clear
                Arrays.asList(byteBuffers).forEach(buffer-> {
                    buffer.clear();
                });

                System.out.println("byteRead:=" + byteRead + " byteWrite=" + byteWirte + ", messagelength" + messageLength);
            }
        }
    }
}

参考:
https://ifeve.com/java-nio-scattergather/

上一篇下一篇

猜你喜欢

热点阅读