Netty内存分配源码分析
1 Netty内存分配与释放实战
public static void main(String[] args) {
//1 分配一个ByteBuf
PooledByteBufAllocator allocator = new PooledByteBufAllocator(false);
ByteBuf byteBuf = allocator.heapBuffer(15);
ByteBuf byteBuf1 = allocator.heapBuffer(9000);
//2 释放分配的内存
byteBuf.release();
byteBuf.release();
}
2 核心类及成员变量
2.1 PoolArena
Netty的PoolArena是由多个Chunk组成的大块内存区域,而每个Chunk则由一个或者多个Page组成,因此,对内存的组织和管理也就主要集中在如何管理和组织Chunk和Page了。
先说结论:PoolArena通过6个PoolChunkList来管理PoolChunk,而每个PoolChunk由N个PoolSubpage构成,即将PoolChunk的里面底层实现 T memory分成N段,每段就是一个PoolSubpage。当用户申请一个Buf时,使用Arena所拥有的chunk所管辖的page分配内存,内存分配的落地点为 T memory上。
abstract class PoolArena<T> implements PoolArenaMetric {
enum SizeClass {
Tiny,
Small,
Normal
}
// 该参数指定了tinySubpagePools数组的长度,由于tinySubpagePools每一个元素的内存块差值为16,
// 因而数组长度是512/16,也即这里的512 >>> 4 = 32 [16,32,64...512]
static final int numTinySubpagePools = 512 >>> 4;
//表示该PoolArena的allocator
final PooledByteBufAllocator parent;
//表示PoolChunk中由Page节点构成的二叉树的最大高度,默认11
private final int maxOrder;
//page的大小,默认8K
final int pageSize;
// 指定了叶节点大小8KB是2的多少次幂,默认为13,该字段的主要作用是,在计算目标内存属于二叉树的
// 第几层的时候,可以借助于其内存大小相对于pageShifts的差值,从而快速计算其所在层数
final int pageShifts;
//默认16MB
final int chunkSize;
// 由于PoolSubpage的大小为8KB=8196,因而该字段的值为
// -8192=>=> 1111 1111 1111 1111 1110 0000 0000 0000
// 这样在判断目标内存是否小于8KB时,只需要将目标内存与该数字进行与操作,只要操作结果等于0,
// 就说明目标内存是小于8KB的,这样就可以判断其是应该首先在tinySubpagePools或smallSubpagePools
// 中进行内存申请
final int subpageOverflowMask;
// 该参数指定了smallSubpagePools数组的长度,默认为4
final int numSmallSubpagePools;
//tinySubpagePools用来分配小于512 byte的Page
private final PoolSubpage<T>[] tinySubpagePools;
//smallSubpagePools用来分配大于等于512 byte且小于pageSize内存的Page
private final PoolSubpage<T>[] smallSubpagePools;
//用来存储用来分配给大于等于pageSize大小内存的PoolChunk
//存储内存利用率50-100%的chunk
private final PoolChunkList<T> q050;
//存储内存利用率25-75%的chunk
private final PoolChunkList<T> q025;
//存储内存利用率1-50%的chunk
private final PoolChunkList<T> q000;
//存储内存利用率0-25%的chunk
private final PoolChunkList<T> qInit;
//存储内存利用率75-100%的chunk
private final PoolChunkList<T> q075;
//存储内存利用率100%的chunk
private final PoolChunkList<T> q100;
//堆内存(heap buffer)
static final class HeapArena extends PoolArena<byte[]> {
}
//堆外直接内存(direct buffer)
static final class DirectArena extends PoolArena<ByteBuffer> {
}
}
2.2 PoolChunk
Chunk主要用来组织和管理多个Page的内存分配和释放。在Netty中,Chunk通过一颗完全二叉树来管理Page。
final class PoolChunk<T> {// PoolChunk会涉及到具体的内存,泛型T表示byte[](堆内存)、或java.nio.ByteBuffer(堆外内存)
final PoolArena<T> arena;//表示该PoolChunk所属的PoolArena。
final T memory;// 具体用来表示内存;byte[]或java.nio.ByteBuffer。
final boolean unpooled;// 是否是可重用的,unpooled=false表示可重用
private final byte[] memoryMap;//默认12层的完全二叉树的节点数是4096
private final byte[] depthMap;
private final PoolSubpage<T>[] subpages;//表示该PoolChunk所包含的PoolSubpage。也就是PoolChunk连续的可用内存。默认数组长度是2048
private final int subpageOverflowMask;//同PoolArena中的一样
private final int pageSize;//每个PoolSubpage的大小,默认为8192个字节(8K)
private final int pageShifts;//同PoolArena中的一样
private final int maxOrder;//同PoolArena中的一样
private final int chunkSize;//同PoolArena中的一样
private final int log2ChunkSize;
private final int maxSubpageAllocs;//完全二叉树最后一层的节点数量 1 << maxOrder (默认2048)
private final byte unusable;
private int freeBytes; //当前PoolChunk空闲的内存。
PoolChunkList<T> parent;//一个PoolChunk分配后,会根据使用率挂在PoolArena的一个PoolChunkList中
// PoolChunk本身设计为一个链表结构
PoolChunk<T> prev;
PoolChunk<T> next;
}
注意:
poolChunk默认由2048个page组成,一个page默认大小为8k,图中节点的值为在数
组memoryMap的下标。
1、如果需要分配大小8k的内存,则只需要在第11层,找到第一个可用节点即可。
2、如果需要分配大小16k的内存,则只需要在第10层,找到第一个可用节点即可。
3、如果节点1024存在一个已经被分配的子节点2048,则该节点不能被分配,如需要分配大小16k的内存,这个时候节点2048已被分配,节点2049未被分配,就不能直接分配节点1024,因为该节点目前只剩下8k内存。
2.3 PoolSubpage
PoolSubpage用于分配小于pageSize的内存。PoolSubpage这个类的块大小是由第一次申请的内存大小来决定的,且每个块大小相同。PoolSubpage这个类是通过位图法来对管理内存的分配。
final class PoolSubpage<T> {
final PoolChunk<T> chunk;//用来表示该Page属于哪个Chunk
private final int memoryMapIdx;//用来表示该Page在Chunk.memoryMap中的索引
private final int runOffset;// 当前Page在chunk.memoryMap的偏移量
private final int pageSize;//Page的大小,默认为8192
/*
long 类型的数组bitmap用来表示Page中存储区域的使用状态,
数组中每个long的每一位表示一个块存储区域的占用情况:0表示未占用,1表示占用。
例如:对于一个4K的Page来说如果这个Page用来分配1K的存储区块,
那么long数组中就只有一个long类型的元素且这个数值的低4危用来指示4个存储区域的占用情况。
*/
private final long[] bitmap;
//PoolSubpage本身设计为一个链表结构
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
int elemSize;//块的大小
private int maxNumElems;//page按elemSize大小分得的块个数
private int bitmapLength;//bitmap数组实际会用到的长度,等于pageSize/elemSize/64
// 下一个可用的位置
private int nextAvail;
// 可用的段数量
private int numAvail;
}
2.4 PoolThreadCache
Netty为了高效分配内存,采用了PoolThreadCache
在ByteBuf
使用完回收时将使用完的ByteBuf
记录到PoolThreadCache
中,下次在需要分配同大小(同范围)的ByteBuf
可直接进行分配,准确的说PoolThreadCache
缓存的并不是ByteBuf
,而是待释放ByteBuf
的chunk、自己的起始位置、最大可访问长度等信息。
final class PoolThreadCache{
//PoolThreadCache
//因为PoolThreadCache主要持有那些需要被释放的ByteBuf,但是不会
//进行ByteBuf的分配工作,所以这里的heapArena和directArena主要用来
//进行相关缓存的计数。
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
//下面则分别为堆内存、直接内存对应的tiny、small和normal的缓存
//PoolThreadCache采用三个数组保存了每种尺寸的ByteBuf,
//因为每种类型的尺寸都是返回值,比如tiny是size<512,所以
//每种类型的尺寸由根据大小计算下标组成了其对应的数组
//数据中的每个元素为MemoryRegionCache类型,MemoryRegionCache
//其实是一个queue,比如Netty中最小的ByteBuf为16,则tiny缓存数组
//第一个元素MemoryRegionCache队列就保存所有被回收的大小为16的
//ByteBuf信息
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
}
2.5 MemoryRegionCache
MemoryRegionCache
是同种大小ByteBuf
(注意这里不是真正的ByteBuf
,只是回收的ByteBuf
的相关信息)的队列,根据为tiny/small或者normal尺寸,有SubPageMemoryRegionCache
和NormalMemoryRegionCache
两种实现,其中NormalMemoryRegionCache
对应nomal的ByteBuf
,SubPageMemoryRegionCache
则对应tiny和small。
private abstract static class MemoryRegionCach}e<T> {
//MemoryRegionCache
//该大小的ByteBuf队列容量
private final int size;
//队列
private final Queue<Entry<T>> queue;
/*枚举,标识该队列的尺寸类型enum SizeClass { Tiny, Small, Normal}*/
private final SizeClass sizeClass;
}
3 内存分配流程
/***********************PooledByteBufAllocator************************/
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//获取PoolThreadCache ,
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
//进行分配
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
//非池化分配
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
/******************************PoolArena**************************/
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
//新建一个ByteBuf对象,这里使用的对象池技术
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
//进行分配
allocate(cache, buf, reqCapacity);
return buf;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 将需要申请的容量格式为 2^N
final int normCapacity = normalizeCapacity(reqCapacity);
// 判断目标容量是否小于8KB,小于8KB则使用tiny或small的方式申请内存
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
// 判断目标容量是否小于512字节,小于512字节的为tiny类型的
if (tiny) { // < 512
// 将分配区域转移到 tinySubpagePools 中
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
//能从缓存中获取就直接
return;
}
// 如果无法从当前线程缓存中申请到内存,则尝试从tinySubpagePools中申请,这里tinyIdx()方法
// 就是计算目标内存是在tinySubpagePools数组中的第几号元素中的
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 如果目标内存在512byte~8KB之间,则尝试从smallSubpagePools中申请内存。这里首先从
// 当前线程的缓存中申请small级别的内存,如果申请到了,则直接返回
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// 获取目标元素的头结点
final PoolSubpage<T> head = table[tableIdx];
// 这里需要注意的是,由于对head进行了加锁,而在同步代码块中判断了s != head,
// 也就是说PoolSubpage链表中是存在未使用的PoolSubpage的,因为如果该节点已经用完了,
// 其是会被移除当前链表的。也就是说只要s != head,那么这里的allocate()方法
// 就一定能够申请到所需要的内存块
synchronized (head) {
// s != head就证明当前PoolSubpage链表中存在可用的PoolSubpage,并且一定能够申请到内存,
// 因为已经耗尽的PoolSubpage是会从链表中移除的
final PoolSubpage<T> s = head.next;
// 如果此时 subpage 已经被分配过内存了执行下文,如果只是初始化过,则跳过该分支
if (s != head) {
// 从PoolSubpage中申请内存
assert s.doNotDestroy && s.elemSize == normCapacity;
// 通过申请的内存对ByteBuf进行初始化
long handle = s.allocate();
assert handle >= 0;
// 初始化 PoolByteBuf 说明其位置被分配到该区域,但此时尚未分配内存
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
// 对tiny类型的申请数进行更新
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
// 走到这里说明目标内存是大于8KB的,那么就判断目标内存是否大于16M,如果大于16M,
// 则不使用内存池对其进行管理,如果小于16M,则到PoolChunkList中进行内存申请
if (normCapacity <= chunkSize) {
// 小于16M,首先到当前线程的缓存中申请,如果申请到了则直接返回,如果没有申请到,
// 则到PoolChunkList中进行申请
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// 对于大于16M的内存,Netty不会对其进行维护,而是直接申请,然后返回给用户使用
allocateHuge(buf, reqCapacity);
}
}
//先在chunklist里面分配
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
//创建一个PoolChunk对象
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
//内存分配
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
qInit.add(c);
}
/******************************PoolChunk********************************/
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;
//这里在完全二叉树中通过伙伴算法查找待分配的
if ((normCapacity & subpageOverflowMask) != 0) {
handle = allocateRun(normCapacity);//分配大于pageSize的内存
} else {
handle = allocateSubpage(normCapacity);//分配小于pageSize的内存
}
if (handle < 0) {
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
initBuf(buf, nioBuffer, handle, reqCapacity);//初始化PooledByteBuf.
return true;
}
4 内存释放流程
/***************************PoolArena***********************************/
public void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity,
PoolThreadCache cache) {
// 如果是非池化的,则直接销毁目标内存块,并且更新相关的数据
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
// 如果是池化的,首先判断其是哪种类型的,即tiny,small或者normal,
// 然后将其交由当前线程的缓存进行处理,如果添加成功,则直接返回
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, nioBuffer, handle,
normCapacity, sizeClass)) {
return;
}
// 如果当前线程的缓存已满,则将目标内存块返还给公共内存块进行处理
freeChunk(chunk, handle, sizeClass, nioBuffer);
}
}
/***********************PoolThreadCache******************************/
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
//根据回收的ByteBuf大小,从指定数组中获取对应队列下标处的
//MemoryRegionCache
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
//如果为空,表示没有启用缓存,直接返回
if (cache == null) {
return false;
}
//如果启用缓存,则加入缓存中
return cache.add(chunk, handle);
}
//根据ByteBuf大小,从指定数组中获取队列,下标计算上面已经介绍过
//这里不再展开介绍
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
/***********************MemoryRegionCache******************************/
//将元素加入到MemoryRegionCache队列中
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = newEntry(chunk, handle);
//这里offer考虑的队列的尺寸,可见第二节的介绍
boolean queued = queue.offer(entry);
//超出队列最大尺寸,则回收entry
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
entry.recycle();
}
return queued;
}