ByteBuf
2019-08-01 本文已影响0人
Pillar_Zhong
体系结构
1563803296721.png从图可以看出来,ByteBuf分类主要是三个维度:
- Unpooled和Pooled
- Unsafe和非Unsafe
- Heap和Direct
访问索引
1563801077089.png从图示中可以容易理解ByteBuf的读写区域
内存分配
策略
1563805578302.png 1563805646415.png 1563805753773.png可以看到在接口方法里面已经区分了是direct还是heap,在继承体系上区分了是Pooled还是Unpooled
而在具体的逻辑中,再根据系统底层是否支持Unsafe来决定是否要返回Unsafe的实现。
UnpooledByteBufAllocator
heap内存的分配
1563806190062.pngprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
// 根据是否支持Unsafe与否来返回具体的UnpooledHeapByteBuf
return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
: new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
初始化
UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 直接去调用父类UnpooledHeapByteBuf的构造函数
// 说明底层的内存分配是一样的,只不过数据操作的方式不一样,会用Unsafe的方式,更快,更直接。
super(alloc, initialCapacity, maxCapacity);
}
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 直接新建byte数组,长度为initialCapacity,readindex和writeindex为0
this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
}
private UnpooledHeapByteBuf(
ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
super(maxCapacity);
...
this.alloc = alloc;
// 保存上面的byte数组作为ByteBuf的存储
setArray(initialArray);
// read和write index归零
setIndex(readerIndex, writerIndex);
}
getByte
Unsafe
// Unsafe的方式操作数据
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}
static byte getByte(byte[] data, int index) {
// 可以看到,这里直接通过byte数组的内存偏移量加上数组的index来获取数据
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
非Unsafe
// 非Unsafe的方式操作数据
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}
// 通过数组下标的方式来操作数组
static byte getByte(byte[] memory, int index) {
return memory[index];
}
direct内存分配
1563807731297.pngprotected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 根据底层对Unsafe的支持与否来决定最终的UnpooledDirectByteBuf
ByteBuf buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
初始化
static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(
ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
非Unsafe
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
...
this.alloc = alloc;
// NIO的方法,这里直接去堆外分配内存
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
// 而这里对老的buffer进行回收,TODO
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
// 将前面创建的堆外内存保存下来
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
Unsafe
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
...
this.alloc = alloc;
// 直接堆外内存分配
setByteBuffer(allocateDirect(initialCapacity), false);
}
// 跟非Unsafe类似
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
}
this.buffer = buffer;
// Unsafe不一样的地方在于,需要知道buffer的内存偏移量,而这个偏移量在allocateDirect会生成
memoryAddress = PlatformDependent.directBufferAddress(buffer);
tmpNioBuf = null;
capacity = buffer.remaining();
}
static long directBufferAddress(ByteBuffer buffer) {
// 可以看到这个偏移量会去找ADDRESS_FIELD_OFFSET
// final Field field = Buffer.class.getDeclaredField("address");
// 而堆外内存的偏移量早在allocateDirect的时候就已经自动赋给buffer的address了,直接反射去拿就好
// 这个address只代表堆外内存的偏移量,
return getLong(buffer, ADDRESS_FIELD_OFFSET);
}
getByte
Unsafe
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
long addr(int index) {
// 根据上面保存下来的偏移量来去拿到index的地址
return memoryAddress + index;
}
非Unsafe
protected byte _getByte(int index) {
// 直接通过api来对buffer进行读取
return buffer.get(index);
}
PooledByteBufAllocator
direct内存分配
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 拿到当前线程绑定的PoolThreadCache
PoolThreadCache cache = threadCache.get();
// 进而拿到PoolThreadCache的堆外内存
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
// 在当前线程绑定的堆外内存区域请求一块空间
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}