PooledByteBuf对象、内存复用
PoolThreadCache: PooledByteBufAllocator 实例维护了一个线程变量。
多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存Chunk。
Pool Chunk里面维护了内存引用,内存复用的做法就是把buf的memory指向Chunck的memory。
我们看下面这段代码
@Test
public void poolTest() {
System.out.println("测试buf回收====");
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
// tiny
ByteBuf buf1 = allocator.directBuffer(495); // 分配的内存最大长度为496
System.out.printf("buf1: 0x%X%n", buf1.memoryAddress());
buf1.release(); // 此时会被回收到tiny 的512b格子中
}
跟踪内存分配的部分
ByteBuf buf1 = allocator.directBuffer(495);
调用 AbstractByteBufAllocator 的directBuffer方法
AbstractByteBufAllocator # directBuffer(int initialCapacity)
因为java平台有unsafe机制,会使用池化的PooledByteBufAllocator:
PooledByteBufAllocator # newDirectBuffer(int initialCapacity, int maxCapacity)
我们来看PooledByteBufAllocator类newDirectBuffer的代码:
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// PoolThreadCache 跟线程绑定的分配内存的缓存,使用jemaclloc的方式分配内存
PoolThreadCache cache = threadCache.get();
// PoolArena 真正的内存分配管理器
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
// poolArena 分配内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
来看 PoolArena中分配内存的方法:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// 首先创建ByteBuf
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 给ByteBuf分配内存
allocate(cache, buf, reqCapacity);
return buf;
}
PoolArena 内部类 DirectArena 的newByteBuf方法:
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) { // 是否支持unsafe机制
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
因为支持unsafe机制,走第一个分支PooledUnsafeDirectByteBuf
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
// 尝试从RECYCLER中复用ByteBuf,复用不到再创建ByteBuf对象
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
// 给新分配出来的ByteBuf做一些清理工作,如设置引用计数为1,readerIndex、writeIndex分别设置为0
buf.reuse(maxCapacity);
return buf;
}
先尝试从 RECYCLER 中复用ByteBuf,RECYCLER 是一个基于threadlocal(线程封闭)机制的轻量级别对象池。
我们来看类Recycler中的get()方法:
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
// 基于线程的Stack
Stack<T> stack = threadLocal.get();
// DefaultHandle 是对象的句柄,里面的value是真正要复用的对象
DefaultHandle<T> handle = stack.pop(); // 尝试复用
if (handle == null) { // 复用不到就创建
handle = stack.newHandle();
// 因为我们要创建PooledUnsafeDirectByteBuf,这里value的类型就是 PooledUnsafeDirectByteBuf
handle.value = newObject(handle);
}
return (T) handle.value;
}
我们来看PooledByteBuf中的reuse方法:
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
// 引用计数设置为1,会调用父类 AbstractReferenceCountedByteBuf 中的方法
setRefCnt(1);
// 调用 AbstractByteBuf中的方法设置readerIndex,writerIndex
setIndex0(0, 0);
discardMarks();
}
至此ByteBuf对象已经创建出来了,接下来看如何为ByteBuf分配内存。我们回到PoolArena 的allocate方法:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 为ByteBuf分配内存
allocate(cache, buf, reqCapacity);
return buf;
}
PoolArena # allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)
// 给buf分配内存空间
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity); // 将需要的容量转换为16的倍数,不足16的就为16
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize tiny和small的
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512 小于512字节,就从tiny缓存区域中分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
PooledThreadCache中有三块缓存区域:tiny,small,normal。首先会根据申请内存的容量去判断分配哪块区域的内存。
如果在tiny区域分配,会调用PoolThreadCache的allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) 方法:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
PoolThreadCache#cacheForTiny(PoolArena<?> area, int normCapacity)
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
tinySubPageDirectCaches 是一个数组,每个元素都是一个tiny类型的内存块,PoolArena.tinyIdx(normCapacity) 计算出需要使用哪个内存块,然后使用对应的idx取出内存块。