Netty

Netty基础-NIO(一)

2022-06-04  本文已影响0人  石头耳东

零、本文纲要

一、NIO三大组件

  1. Channel
  2. Buffer
  3. Selector

二、Buffer

  1. 基础依赖
  2. ByteBuffer使用
  3. ByteBuffer结构
  4. ByteBuffer常见方法

三、Buffer使用模拟

  1. 情景模拟
  2. 模拟还原数据

一、NIO三大组件

NIO,non-blocking io 非阻塞 IO

Channel / Buffer / Selector

1. Channel

双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel;
与stream对比,stream是单向的,要么输入要么输出。

常见的Channel:
FileChannel / DatagramChannel / SocketChannel / ServerSocketChannel

2. Buffer

用来缓冲读写数据。

常见的Buffer:
ByteBuffer(MappedByteBuffer/DirectByteBuffer/HeapByteBuffer) /
ShortBuffer / IntBuffer / LongBuffer / FloatBuffer / DoubleBuffer / CharBuffer

3. Selector

① 多线程处理多个Socket连接

单个Thread对应单个Socket

内存占用高 / 线程上下文切换成本高 / 仅适合【连接数少】的场景

② 线程池处理多个Socket连接

单个Thread可以处理多个Socket

阻塞模式下线程只能处理一个Socket / 仅适合【短连接】的场景

③ selector配合线程处理多个Socket

selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。
适合连接数特别多,但流量低的场景(low traffic)。

调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理。

二、Buffer

0. 基础依赖

netty-all           4.1.39.Final
lombok              1.16.18
gson                2.8.5
guava               19.0
logback-classic     1.2.3
protobuf-java       3.11.3

1. ByteBuffer使用

a、向 buffer 写入数据,例如调用 channel.read(buffer)
b、调用 flip() 切换至读模式
c、从 buffer 读取数据,例如调用 buffer.get()
d、调用 clear() 或 compact() 切换至写模式
e、重复 1~4 步骤

try (RandomAccessFile file = new RandomAccessFile("src/main/resources/data.txt", "rw")) {
    FileChannel channel = file.getChannel();
    ByteBuffer buffer = ByteBuffer.allocate(16);
    do {
        //1. 向 buffer 写入
        int len = channel.read(buffer);
        log.debug("读到的字节数:{}", len);
        if (len == -1) {
            break;
        }
        //2. 切换 buffer 读模式
        buffer.flip();
        while (buffer.hasRemaining()) {
            log.debug("{}", (char) buffer.get());
        }
        //3. 切换 buffer 写模式
        buffer.clear();
    } while (true);
} catch (IOException e) {
    log.info(e.getMessage());
}

2. ByteBuffer结构

// Creates a new buffer with the given mark, position, limit, capacity,
// backing array, and array offset
ByteBuffer(int mark, int pos, int lim, int cap,   // package-private
             byte[] hb, int offset)
{
    super(mark, pos, lim, cap);
    this.hb = hb;
    this.offset = offset;
}
mark            标记位
position        当前位
limit           界限位
capacity        容量
backing array   支撑数组
array offset    数组偏移

3. ByteBuffer常见方法

① allocate方法

用来给ByteBuffer分配空间

public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}

HeapByteBuffer(int cap, int lim) {...} //此时容量对应limit写上线
allocate方法.png

② channel#read方法 / buffer#put方法

向 buffer 写入数据

FileChannelImpl#read → IOUtil#readIntoNativeBuffer

public final ByteBuffer put(byte[] src) {
    return put(src, 0, src.length);
}

public ByteBuffer put(byte[] src, int offset, int length) {
    checkBounds(offset, length, src.length);
    if (length > remaining())
        throw new BufferOverflowException();
    int end = offset + length;
    for (int i = offset; i < end; i++)
        this.put(src[i]);
    return this;
}
channel的read方法.png

③ filp方法

切换至【读模式】,重置position、limit,可从buffer中读取数据

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}
filp方法.png

注意:
a、filp方法将 写limit定位到读limit,position重置为0,进而读取内容。
b、另外此时mark也会被清除。

④ hasRemaining方法

判断是否仍有剩余数据

public final boolean hasRemaining() {
    return position < limit;
}

⑤ buffer#get方法 / channel#write

HeapByteBuffer#get → Buffer#nextGetIndex
FileChannel#write(ByteBuffer[] srcs)

get方法注意点:
a、会使 position 读指针向后走;
b、可以使用 rewind 方法,使 position 重置,而limit不变,用来重复度;
c、调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针。

get方法.png
public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

注意:rewind方法会重置mark标记。

对比filp与rewind:后者 rewind 没有改变 limit指针 所指向的读上限。

⑥ clear方法

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}
clear方法.png

注意:clear方法并没有清除内容,而是改变了指针的指向,提升了效率。

⑦ compact方法

HeapByteBuffer#compact

compact方法.png

注意:compact方法允许我们未读完,而且可以在未读的后一个位置重新开始写。

⑧ mark方法 & reset方法

public final Buffer mark() {
    mark = position;
    return this;
}

public final Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}
mark方法 & reset方法.png

注意:mark方法与reset方法允许我们在任意mark位置重新读,rewind方法是从头开始。

⑨ 字符串 与 buffer 互相转换

ByteBuffer buffer = StandardCharsets.UTF_8.encode("StrToBuffer");
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer);

三、Buffer使用模拟

1. 情景模拟

网络通信:
a、客户端发送多条数据给服务端,数据间使用"\n"分隔;
b、数据接收时为了提升效率,数据会被服务端重新组合。

模拟数据为:
a、Hello, NIO.\n
b、I`m Stone.\n
c、How are you?\n

此时,服务器将数据重组,出现ByteBuffer (黏包,半包),如下:
a、Hello, NIO.\nI`m Stone.\nHo 【24bytes】
c、w are you?\n 【11bytes】

2. 模拟还原数据

省略了buffer动态扩容与收缩的业务逻辑,实际使用时,框架内一般会有代码实现。

@Slf4j
public class BufferDemo01 {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(32);
        //1. 接收到第一组数据
        //1.1 模拟接收到第一组数据
        buffer.put("Hello, NIO.\nI`m Stone.\nHo".getBytes(StandardCharsets.UTF_8));
        //1.2 处理第一组数据
        split(buffer);
        //2. 接收到第二组数据
        //2.1 模拟接收到第二组数据
        buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
        //2.2 处理第二组数据
        split(buffer);
    }

    public static void split(ByteBuffer buffer) {
        //1. 切换至 读模式
        buffer.flip();
        //2. 记录当前 读上限
        int originLimit = buffer.limit();
        //3. 处理当前数据
        for (int i = 0; i < originLimit; i++) {
            //3.1 如果读取到的数据是规定的 分隔符"\n"
            if (buffer.get(i) == '\n') {
                log.debug("当前分隔符所在的位置:{},buffer.position():{}。", i, buffer.position());
                ByteBuffer message = ByteBuffer.allocate(i + 1 - buffer.position());
                buffer.limit(i + 1); //3.2 调整当前读上限为 message 容量
                message.put(buffer); //3.3 从 buffer 读,向 message 写
                //debugAll(message); //该方法是打印当前 message 的方法
                buffer.limit(originLimit); //3.4 调整当前读上限为原先读originLimit
            }
        }
        //4. 如果当前数据有剩余,则将当前数据拼接至下组数据
        buffer.compact();
    }
}
输出内容.png

四、结尾

以上即为Netty基础-NIO(一)的全部内容,感谢阅读。

上一篇下一篇

猜你喜欢

热点阅读