netty ByteBuf
本文基于 netty 4.1.46
一、作用
ByteBuf 的作用:在用 jdk nio 写程序主要使用的是java.nio.ByteBuffer,从功能角度,ByteBuffer完全可以满足需要,但是有一定的局限性。
例如:
(1)、长度固定、容量不能动态扩展和收缩
(2)、ByteBuffer 只有一个 position 读写的时候需要手工调用 flip() 和 rewind() 方法重置标记,否则导致处理失败
为了弥补这些不足,Netty 提供了类似 java.nio.ByteBuffer 的实现 ByteBuf ,带着这几个问题,看一下 netty 是怎么解决的。
二、ByteBuf 结构

注意:最后一部分maxCapacity 默认为Integer.MAX_VALUE,netty 注释中没有,个人觉得加上更清晰一些
(1)、read \ write \ set 方法
read 和 write 都会移动指针 ,set 方法不会
(2)、mark 和 reset 方法
mark 可以理解为保存点,即保存读之前的 readerIndex ,或写之前的 writerIndex , reset 方法为mark 方法的保存点。
此处 netty 两个指针分别保存读写标志位,所以避免了jdk.nio.ByteBuffer,写之后读或读后写,都要反复重置标记位问题。
1、ByteBuf 的继承关系

ByteBuf 继承体系中最重要的是 AbstractByteBuf 类,所有骨架方法大部分都在此类中实现。
netty ByteBuf 的继承体系可分为三类。
-
1、Pooled 和 Unpooled
Pooled 的内存分配是在预先分配好的内存中取一段连续的内存封装成ByteBuf。
Unpooled 内存分配的时候直接调用系统 api .向操作系统分配内存。
如何区分:依赖子类的不同实现 -
2、Unsafe 和 非Unsafe 分类
Unsafe:直接拿到 ByteBuf 在 jvm 内的内存地址,通过jdk unsafe 类的直接操作。
非Unsafe:不会依赖 jdk unsafe 类。
如何区分:netty 自动判别的,jdk底层是否可以拿到 Unsafe 对象 -
3、Heap 和 Direct
Heap 堆内存:是指 byte[] 对象
Direct 堆外内存 : 是指 java.nio.ByteBuffer 对象,netty 在上边封装了一层成为ByteBuf
如何区分:依赖子类构造函数入参 directByDefault 属性区分。
三 、实现扩容
1、AbstractByteBuf 类 writeBytes(byte[] src, int srcIndex, int length)
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
//如果写入的 length 大于数组容量则扩容
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
//写索引累计
writerIndex += length;
return this;
}
2、AbstractByteBuf 类 ensureWritable0
步骤:
- 1、不需要扩容直接返回
- 2、 大于ByteBuf的最大扩容阀值抛出异常 maxCapacity 默认为 Integer.MAX_VALUE
- 3、ByteBuf 剩余空间是否大于需要写入的空间
- 4、计算需要分配的内存空间
- 5、重新申请内存
final void ensureWritable0(int minWritableBytes) {
final int writerIndex = writerIndex();
final int targetCapacity = writerIndex + minWritableBytes;
//不需要扩容直接返回
if (targetCapacity <= capacity()) {
//检查 ByteBuf 是否被释放了,refCnt ==0
ensureAccessible();
return;
}
//大于ByteBuf的最大扩容阀值抛出异常 maxCapacity 默认为 Integer.MAX_VALUE
if (checkBounds && targetCapacity > maxCapacity) {
ensureAccessible();
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
//计算 ByteBuf 剩余可写容量,Math.min(maxLength, maxCapacity()) - writerIndex;
// Normalize the target capacity to the power of 2.
final int fastWritable = maxFastWritableBytes();
//ByteBuf 剩余空间是否大于需要写入的空间,否则计算需要分配的内存空间
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity);
// Adjust to the new capacity.
capacity(newCapacity);
}
3、calculateNewCapacity() 计算需要分配的内存空间
因为内存比较小的时候采用倍增是可以接受的,但是内存大的情况下 ,比较浪费空间。所以内存比较大的情况采用递增,节约内存。
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
//4M 为扩容策略的阀值
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
//如果需求容量 大于4M 那么以 4M 步长递增
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
//如果需求容量 小于4M 那么以 64 基数被增
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
4、增加容量方法 capacity(int newCapacity)
以 AbstractByteBuf 的子类 PooledByteBuf 的 capacity(int newCapacity) 为例
length 是指用户实际要求分配的 byte[] 的长度
maxLength 是指 ByteBuf 在 Chunk 的 memory 中实际占用的长度
@Override
public final ByteBuf capacity(int newCapacity) {
//length 是指用户实际要求分配的 byte[] 的长度
if (newCapacity == length) {
ensureAccessible();
return this;
}
//检查扩容边界
checkNewCapacity(newCapacity);
if (!chunk.unpooled) {
// If the request capacity does not require reallocation, just update the length of the memory.
if (newCapacity > length) {
//扩容的容量 newCapacity 小于当前 Buf 的 maxLength,那么修改 length 即可,
//maxLength 是指 ByteBuf 在 Chunk 的 memory 中实际占用的长度
if (newCapacity <= maxLength) {
length = newCapacity;
return this;
}
} else if (newCapacity > maxLength >>> 1 &&
(maxLength > 512 || newCapacity > maxLength - 16)) {
//如果 newCapacity 大于 maxLength 的一半,那么更新 length
// here newCapacity < length
length = newCapacity;
trimIndicesToCapacity(newCapacity);
return this;
}
}
//申请内存
// Reallocation required.
chunk.arena.reallocate(this, newCapacity, true);
return this;
}
5、申请内存
关于内存的分配 allocate 和释放 free 下节讲解
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();
int oldCapacity = buf.length;
if (oldCapacity == newCapacity) {
return;
}
//将久 ByteBuf 的信息保存
PoolChunk<T> oldChunk = buf.chunk;
ByteBuffer oldNioBuffer = buf.tmpNioBuf;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
int oldMaxLength = buf.maxLength;
//重新划分一块内存,并且调用 buf.init() 方法。
// This does not touch buf's reader/writer indices
allocate(parent.threadCache(), buf, newCapacity);
int bytesToCopy;
if (newCapacity > oldCapacity) {
bytesToCopy = oldCapacity;
} else {
buf.trimIndicesToCapacity(newCapacity);
bytesToCopy = newCapacity;
}
//将旧的内存中的数据,copy 到新申请到内存中
memoryCopy(oldMemory, oldOffset, buf, bytesToCopy);
//释放久的内存占用
if (freeOldMemory) {
free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
}
}
6、byte 复制
@Override
protected void memoryCopy(ByteBuffer src, int srcOffset, PooledByteBuf<ByteBuffer> dstBuf, int length) {
if (length == 0) {
return;
}
//jdk api 中可以获取到 Unsafe 对象
if (HAS_UNSAFE) {
PlatformDependent.copyMemory(
PlatformDependent.directBufferAddress(src) + srcOffset,
PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length);
} else {
//否者用 java nio ByteBuffer 进行copy
// We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
src = src.duplicate();
ByteBuffer dst = dstBuf.internalNioBuffer();
src.position(srcOffset).limit(srcOffset + length);
dst.position(dstBuf.offset);
dst.put(src);
}
}
}