netty(十一)初识Netty - ByteBuf 创建与读写
本章节主要学习一下netty当中的ByteBuf,ByteBuf是对字节数据的封装。
接下来主要学习Bytebuf的使用以及其细节。
一、ByteBuf使用
1.1 创建ByteBuf
通常可以使用如下方式创建:
public static void main(String[] args) {
//创建一个容量为10的ByteBuf
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
System.out.println(buf);
}
查看结果:
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 10)
由结果我们看到初始化了一个池化的直接内存buf,也就是说如果我们使用默认类型创建,会返回一个直接内存。
ridx表示read index,即读取位置是0;
widx表示write index,即写入位置是0;
cap表示容量,是10;
1.1.1 直接内存 和 堆内存
- 使用如下方式创建一个堆内存:
//创建一个堆内存buf
ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
关于堆内存是通过jvm进行回收的,所以我们不需要过多的干预。
- 使用如下方式创建一个直接内存
//创建一个直接内存buf
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接内存的特性:
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
关于Netty中直接内存的介绍,在后面涉及到再继续介绍,此处简单带过。
1.1.2 池化 和 非池化
无论在任何使用池的地方,例如数据库连接池,线程池等等。
池化最大的意义莫过于可以重用资源。
在Netty当中,引入池化机制,使得我们可以重用ByteBuf。
那么池化对我们使用ByteBuf提供了哪些优点?
- 使我们不必在每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵;即使是使用堆内存,也会增加 GC 压力。
- 可以重用池中 ByteBuf 实例,并且采用了与 jemalloc(http://jemalloc.net/) 类似的内存分配算法提升分配效率。
- 高并发时,池化功能更节约内存,减少内存溢出的可能
1.1.3 ByteBuf的组成
其源码如下:
public abstract class ByteBuf extends Object implements ReferenceCounted, Comparable<ByteBuf>
如上所示,是一个抽象类,继承自Object,实现了ReferenceCounted,Comparable<ByteBuf>。
ReferenceCounted是用于ByteBuf的回收工作,从其名称就能看出,是引用计数法,refCnt是引用数值;其提供了方法retain(),计数+1;release()方法,计数减1,当最终引用计数是0时,资源将被释放。此处不多做介绍。
Comparable主要用作比较。
就像普通的原始字节数组一样, ByteBuf使用从零开始的索引 。 这意味着第一个字节的索引始终为0 ,最后一个字节的索引始终为capacity - 1 。
ByteBuf提供了两个指针变量来支持顺序读写操作readerIndex用于读操作和writerIndex用于写操作。
下面通过图的形式展示缓冲区的变化过程:
ByteBuf.png如上所示的缓冲区划分方式,相比于之前学习的NIO中的ByteBuffer有一定的优势:
- 在ByteBuffer中,读写通过position这一个指针,需要通过flip()去切换;而在ByteBuf中,读写指针区分了,使用更加方便。
- ByteBuf引入了自动扩容的能力。
1.2 写入ByteBuf
关于ByteBuf的写入,提供了很多的方法,我直接将图粘在下方:
ByteBuf-write.png1.2.1 写入示例
public static void main(String[] args) {
//申请长度是10的buffer
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
//写入5个长度的字节数组
byte[] bytes = new byte[]{1, 2, 3, 4, 5};
byteBuf.writeBytes(bytes);
System.out.println(byteBuf);
//再次写入5个
byteBuf.writeBytes(bytes);
System.out.println(byteBuf);
}
结果:
PooledUnsafeDirectByteBuf(ridx: 0, widx: 5, cap: 10)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 10)
比较简单,关于其他方式,不演示了。
1.2.2 大端写入和小端写入
在前面的图中有一些方法是带有LE的,可以成为小端写入,比如
writeIntLE(int value)
其中的LE是 Little Endian 的缩写,可以理解为小字节序、低字节序。
与之相对应的就是 Big Endian,大字节序或高字节序。高字节序没有特别标注,因为在网络编程中通常使用的都是高字节序。如下面的代码就是高字节序:
writeInt(int value)
举例:写入两个数字888和888L
其二进制表示就是:
| 0000 0000 | 0000 0000 | 0000 0011 | 0111 1000 |
| 0000 0000 | 0000 0000 | 0000 0000 | 0000 0000 | 0000 0000 | 0000 0000 | 0000 0011 | 0111 1000 |
将其转换成字节数组就是
[0,0,3,120]
[0,0,0,0,0,0,3,120]
有如下测试代码:
public static void main(String[] args) {
int a = 888;
//大端写入
ByteBuf byteBuf1 = ByteBufAllocator.DEFAULT.buffer();
byteBuf1.writeInt(a);
byte[] bytes1 = new byte[4];
byteBuf1.readBytes(bytes1);
System.out.println(Arrays.toString(bytes1));
//小端写入
ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.buffer();
byteBuf2.writeIntLE(a);
byte[] bytes2 = new byte[4];
byteBuf2.readBytes(bytes2);
System.out.println(Arrays.toString(bytes2));
long b = 888L;
//大端写入
ByteBuf byteBuf3 = ByteBufAllocator.DEFAULT.buffer();
byteBuf3.writeLong(b);
byte[] bytes3 = new byte[8];
byteBuf3.readBytes(bytes3);
System.out.println(Arrays.toString(bytes3));
//小端写入
ByteBuf byteBuf4 = ByteBufAllocator.DEFAULT.buffer();
byteBuf4.writeLongLE(b);
byte[] bytes4 = new byte[8];
byteBuf4.readBytes(bytes4);
System.out.println(Arrays.toString(bytes4));
}
结果:
[0, 0, 3, 120]
[120, 3, 0, 0]
[0, 0, 0, 0, 0, 0, 3, 120]
[120, 3, 0, 0, 0, 0, 0, 0]
结论:int占4个字节,long占8个字节,大端写入是从左向右写;小端写入从右向左写。
网络传输通常使用大端传输(Big Endian)
1.2.3 set方式的写入
还有一些列以set命名的写入方式,以下面为例:
public abstract ByteBuf setBytes(int index, byte[] src)
这一类方法要求我们指定index,从此index开始写入数据, 此方法不会修改此缓冲区的readerIndex或writerIndex 。
示例代码如下:
public static void main(String[] args) {
//申请长度是10的buffer
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.heapBuffer(10);
//写入5个长度的字节数组
byte[] bytes = new byte[]{1, 2, 3, 4, 5};
//设置从index是5的位置开始写入
byteBuf.setBytes(5, bytes);
System.out.println(byteBuf);
//手动设置index写入位置到10
byteBuf.writerIndex(10);
System.out.println(byteBuf);
//长度10的字节数组进行读取
byte[] readBytes = new byte[10];
byteBuf.readBytes(readBytes);
System.out.println(Arrays.toString(readBytes));
}
结果:
PooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
PooledUnsafeHeapByteBuf(ridx: 0, widx: 10, cap: 10)
[0, 0, 0, 0, 0, 1, 2, 3, 4, 5]
1.2.4 扩容
此处我们继续使用在1.2.1小节中的代码,继续增加5个长度:
public static void main(String[] args) {
//申请长度是10的buffer
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
//写入5个长度的字节数组
byte[] bytes = new byte[]{1, 2, 3, 4, 5};
byteBuf.writeBytes(bytes);
System.out.println(byteBuf);
//再次写入5个
byteBuf.writeBytes(bytes);
System.out.println(byteBuf);
//再次写入5个
byteBuf.writeBytes(bytes);
System.out.println(byteBuf);
}
结果:
PooledUnsafeDirectByteBuf(ridx: 0, widx: 5, cap: 10)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 10)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 15, cap: 16)
如上所示发现最后一条的容量变成了16。可是我们初始给的只有10,此处就是自动进行了扩容操作。
AbstractByteBuf中的ensureWritable0方法就是扩容方法,其扩容代码如下所示:
final void ensureWritable0(int minWritableBytes) {
//获取当前写入下标
final int writerIndex = writerIndex();
//预计写入后的下标
final int targetCapacity = writerIndex + minWritableBytes;
//如果小于初始容量,不扩容
if (targetCapacity <= capacity()) {
ensureAccessible();
return;
}
//超过最大容量,跑出异常
if (checkBounds && targetCapacity > maxCapacity) {
ensureAccessible();
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// 将目标容量归一化为2的幂。
//此处用初始化ByteBuf时指定的默认大小,会根据初始赋值计算出2的幂
//会用该默认大小 减去 写入位置的下标,得到能写入的大小的最大值
final int fastWritable = maxFastWritableBytes();
//如果能写入的最大值 大于 需要写入的容量,则将能写入的值 加上 当前写入下标做为新的容量
//否则使用calculateNewCapacity去分配
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity);
// Adjust to the new capacity.
capacity(newCapacity);
}
maxFastWritableBytes内部会获取一个maxLength,这个值是在allocate缓冲区是计算出来的2的幂,且小于512的,用它减去当前写入index的位置。
public int maxFastWritableBytes() {
return Math.min(maxLength, maxCapacity()) - writerIndex;
}
下面看下初始化时如何计算出的这个默认值:
int normalizeCapacity(int reqCapacity) {
checkPositiveOrZero(reqCapacity, "reqCapacity");
// 如果此处大于chunkSize(16777217),则将这个值作为分配的值
if (reqCapacity >= chunkSize) {
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
}
// >= 512
if (!isTiny(reqCapacity)) {
// 翻倍(乘以2)
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
return normalizedCapacity;
}
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
// Quantum-spaced
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
return (reqCapacity & ~15) + 16;
}
在上面的代码中,有一个默认值chunkSize,如果分配的值大于chunkSize(16777217),则将这个值作为分配的值,而在后面扩容时ensureWritable0中的判断:
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable : this.alloc().calculateNewCapacity(targetCapacity, this.maxCapacity);
此时会走重新计算容量的方法:
his.alloc().calculateNewCapacity(targetCapacity, this.maxCapacity);
具体代码如下:
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
//如果需要的容量大于最大容量,则抛出异常
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
//定义一页4M的阈值
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
//如果需要的容量等于页阈值,就返回该阈值
if (minNewCapacity == threshold) {
return threshold;
}
// 如果需要容量大于 页阈值
if (minNewCapacity > threshold) {
//计算最小需要几个页阈值的大小,并赋予一个新的容量值
int newCapacity = minNewCapacity / threshold * threshold;
//如果新容量,比 最大容量 - 一个分页还要大,即剩余不到一个页,则赋予最大容量
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
//在赋予的新容量基础上 在加一个页容量
newCapacity += threshold;
}
return newCapacity;
}
// 如果没有超过页阈值,则从64开始,最大增加到4
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
//左移一位,即*2
newCapacity <<= 1;
}
// 取新赋予容量和最大值中的最小值
return Math.min(newCapacity, maxCapacity);
}
关于扩容的问题就看到这里了。
1.3 读取ByteBuf
1.3.1 代码演示
关于读取方法都是和写入相对应的,这里不列举了,直接上示例代码
public static void main(String[] args) {
//申请长度是10的buffer
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
//写入5个长度的字节数组
byte[] bytes = new byte[]{1, 2, 3, 4, 5};
byteBuf.writeBytes(bytes);
System.out.println(byteBuf);
//直接读取下一个字节
System.out.println(byteBuf.readByte());
System.out.println(byteBuf);
System.out.println(byteBuf.readByte());
System.out.println(byteBuf);
//读取自己数组
byteBuf.readBytes(new byte[3]);
System.out.println(byteBuf);
}
结果:
PooledUnsafeDirectByteBuf(ridx: 0, widx: 5, cap: 10)
1
PooledUnsafeDirectByteBuf(ridx: 1, widx: 5, cap: 10)
2
PooledUnsafeDirectByteBuf(ridx: 2, widx: 5, cap: 10)
PooledUnsafeDirectByteBuf(ridx: 5, widx: 5, cap: 10)
1.3.2 重复读取
1)使用get开头的方法,不会修改index。
2)使用mark:
public static void main(String[] args) {
//申请长度是10的buffer
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(10);
//写入5个长度的字节数组
byte[] bytes = new byte[]{1, 2, 3, 4, 5};
byteBuf.writeBytes(bytes);
//设置一个mark
byteBuf.markReaderIndex();
//读取自己数组
System.out.println(byteBuf.readByte());
System.out.println(byteBuf);
//重置到mark
byteBuf.resetReaderIndex();
//读取自己数组
System.out.println(byteBuf.readByte());
System.out.println(byteBuf);
}
结果:
1
PooledUnsafeDirectByteBuf(ridx: 1, widx: 5, cap: 10)
1
PooledUnsafeDirectByteBuf(ridx: 1, widx: 5, cap: 10)
限于篇幅原因,先写到这,后面继续更新ByteBuf的文章。有学到帮忙点个赞啦~~