【Netty】ByteBuf 和零拷贝机制

2019-07-07  本文已影响0人  佐蓝Gogoing

JDK 的 ByteBuffer 有很多问题:

  • 无法动态扩容
    长度固定,不能动态扩展和收缩,当数据大于 ByteBuffer 容量时,会发生索引越界异常
  • API 使用复杂
    读写的时候需要手动调用 flip() 和 rewind() 等方法,使用时需要非常谨慎的使用这些 api,否则很容易出现错误

Netty 有自己的 ByteBuf ,做了如下增强:

  • API 操作便捷性
  • 动态扩容
  • 多种 ByteBuf 实现
  • 高效的零拷贝机制

1. ByteBuf 操作

ByteBuf 有三个重要属性

capacity 容量
readerIndex 读取位置
writerIndex 写入位置

ByteBuf readerIndex 和 writerIndex 分为三块区域,readerIndex 左边的已经读取过了可以丢弃,readerIndex 到 writerIndex 就是可以继续读

操作举例:

  1. 创建一个 ByteBuf,大小为10
ByteBuf buf = Unpooled.buffer(10);
System.out.println("ByteBuf > " + "rind: " + buf.readerIndex() + ", widx: " + buf.writerIndex() + ", cap: " + buf.capacity());
System.out.println("ByteBuf 内容为 > " + Arrays.toString(buf.array()) + "\n");
ByteBuf > ridx: 0, widx: 0, cap: 10
ByteBuf 内容为 > [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  1. 写入一段内容 {1, 2, 3, 4, 5}
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("ByteBuf > " + "ridx: " + buf.readerIndex() + ", widx: " + buf.writerIndex() + ", cap: " + buf.capacity());
System.out.println("ByteBuf中的内容为 > " + Arrays.toString(buf.array()) + "\n");
ByteBuf > ridx: 0, widx: 5, cap: 10
ByteBuf中的内容为 > [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
  1. 读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的bytes为>" + Arrays.toString(new byte[]{b1, b2}));
System.out.println("ByteBuf > " + "ridx: " + buf.readerIndex() + ", widx: " + buf.writerIndex() + ", cap: " + buf.capacity());
System.out.println("ByteBuf中的内容为 > " + Arrays.toString(buf.array()) + "\n");
读取的bytes为>[1, 2]
ByteBuf > =>ridx: 2, widx: 5, cap: 10
ByteBuf中的内容为 > [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
  1. 将读取的内容丢弃
buf.discardReadBytes();
        System.out.println("ByteBuf > >" + "ridx: " + buf.readerIndex() + ", widx: " + buf.writerIndex() + ", cap: " + buf.capacity());
        System.out.println("ByteBuf中的内容为 > " + Arrays.toString(buf.array()) + "\n");
ByteBuf > >ridx: 0, widx: 3, cap: 10
ByteBuf中的内容为 > [3, 4, 5, 4, 5, 0, 0, 0, 0, 0]

丢弃操作会把未读取部分整个往前挪,前面 3,4,5 是未读取的,继续写入的话会把后面的4,5覆盖

  1. 清空
buf.clear();
System.out.println("ByteBuf > >" + "ridx: " + buf.readerIndex() + ", widx: " + buf.writerIndex() + ", cap: " + buf.capacity());
System.out.println("ByteBuf中的内容为 > " + Arrays.toString(buf.array()) + "\n");
ByteBuf > >ridx: 0, widx: 0, cap: 10
ByteBuf中的内容为 > [3, 4, 5, 4, 5, 0, 0, 0, 0, 0]

清空时清空指针,并不会清空数据

  1. 再次写入一段内容,比第一段内容少
byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
System.out.println("写入的bytes为>" + Arrays.toString(bytes2));
System.out.println("ByteBuf > " + "ridx: " + buf.readerIndex() + ", widx: " + buf.writerIndex() + ", cap: " + buf.capacity());
System.out.println("ByteBuf中的内容为 > " + Arrays.toString(buf.array()) + "\n");
写入的bytes为>[1, 2, 3]
ByteBuf > ridx: 0, widx: 3, cap: 10
ByteBuf中的内容为 > [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]

这时写入会从第一个开始覆盖

  1. 将 ByteBuf 清零
buf.setZero(0, buf.capacity());
System.out.println("ByteBuf > " + "ridx: " + buf.readerIndex() + ", widx: " + buf.writerIndex() + ", cap: " + buf.capacity());
System.out.println("ByteBuf中的内容为 > " + Arrays.toString(buf.array()) + "\n");
ByteBuf > ridx: 0, widx: 3, cap: 10
ByteBuf中的内容为 > [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

会用 0 填充,但是指针都不变

  1. 写入一段超过容量的内容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
buf.writeBytes(bytes3);
System.out.println("写入的bytes为>" + Arrays.toString(bytes3));
System.out.println("ByteBuf > " + "ridx: " + buf.readerIndex() + ", widx: " + buf.writerIndex() + ", cap: " + buf.capacity());
System.out.println("ByteBuf中的内容为 > " + Arrays.toString(buf.array()) + "\n");
写入的bytes为>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
ByteBuf > ridx: 0, widx: 14, cap: 64
ByteBuf中的内容为 > [0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

前面 widx 为3,所以是从第三位开始写的,写入超出容量的内容时,ByteBuf 会自动扩容。

2. 动态扩容

capacity 默认值: 256,最大值:Integer.MAX_VALUE(2GB)。

write 方法调用时,通过 AbstractByteBuf.ensureWritable() 进行检查,如果剩余容量不够,通过 AbstractByteBufAllocator.calculateNewCapacity(capacity 最小要求, capacity 最大值) 计算容量。

计算 capacity 大小时有两套方案,
如果 capacity 没超过 4 兆,capacity 每次都会增加一倍,直到大小满足最小要求。
如果 capacity 超过 4 兆,新容量 = 容量最小要求 / 4兆 * 4兆 + 4兆

3. 选择合适的 ByteBuf 实现

ByteBuf 可以根据三个维度,划分为8中实现


在实际使用中都是通过 ByteBufAllocator 分配器进行申请,同时分配器具有内存管理的功能。

堆内和堆外的区别:

  1. 堆内的 ByteBuf 本质是一个数组,对数组的操作进行了封装,堆外的 ByteBuf 本质是 NIO 的 ByteBuffer
  2. 堆外没有实现 array() 方法,其他方法和堆内是一样的

访问方式的区别:

  1. unsafe 用到了 Unsafe 工具类,Unsafe 是 Java 保留的一个底层工具包,safe 则没有用到 Unsafe 工具类
  2. unsafe 意味着不安全的操作,但是更底层的操作会带来性能提升和特殊功能,Netty 中会尽力使用 unsafe。
  3. Java 语言很重要的特性是“一次编写导出运行”,所以它针对底层的内存或其他操作,做了很多封装。而 unsafe 提供了一系列操作底层的方法,可能会导致不兼容或者不可知的异常。

以上的区别再使用时是基本没有感知的,Netty 已经帮我们封装好了,但是 pool 和 unpool 是要我们注意的。

pool 和 unpool 的区别:

  1. unpool 每次申请缓冲区时会新建一个,并不会复用,使用 Unpooled 工具类可以创建 unpool 的缓冲区
  2. Netty 没有给出很便捷的 pool 类型的缓冲区的创建方法。使用 ChannelConfig.getAllocator() 时,获取到的分配器是默认支持内存复用的

// TODO 此处需要加上 PoolByteBuf 实现

4. 零拷贝机制

Netty 的零拷贝机制,是一种应用层的实现。

  1. 一般的数组合并,会创建一个大的数组,然后将需要合并的数组放进去。
    Netty 的 CompositeButyBuf 将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了各个 ByteBuf 之间的拷贝。


CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
  1. wrappedBuffer 方法将 byte[] 数组包装成 ByteBuf 对象


    image.png
 ByteBuf byteBuf = Unpooled.wrappedBuffer(new Byte[]{1, 2, 3, 4, 5});
  1. slice 方法将一个 ByteBuf 对象切分成多个 ByteBuf 对象
 ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
newBuffer.unwrap();
上一篇下一篇

猜你喜欢

热点阅读