Netty源码-PoolThreadCache
1 概述
为了优化内存分配和回收,Netty使用了内存池机制分配ByteBuf
,其实在分配ByteBuf
时,Netty不仅使用了内存池技术,还采用了对象池技术(本文暂且将PoolThreadCache
分配称为一种对象池技术)。我们知道Netty内存池是预先分配一大块内存,然后在这一大块预分配的内存中划分实际所需的各种大小的ByteBuf
,采用内存池分配出来的ByteBuf
都是在这一大块内存上进行数据存取,这些ByteBuf
实际上就是记录了自己在这一大块内存上的起始位置、划分给自己的大小(范围)。
如果没有采用PoolThreadCache
,则每次分配新的ByteBuf
都需要使用分配算法在预分配的一大块内存上进行分配,效率比较低,采用了PoolThreadCache
则可以在ByteBuf
使用完回收时将使用完的ByteBuf
记录到PoolThreadCache
中,下次在需要分配同大小(同范围)的ByteBuf
可直接进行分配,准确的说PoolThreadCache
缓存的并不是ByteBuf
,而是待释放ByteBuf
的chunk、自己的起始位置、最大可访问长度等信息,所以上面说PoolThreadCache
‘暂且’是一种对象池技术,下面在介绍PoolThreadCache
也不再具体区分其保存的是ByteBuf
的相关信息还是直接保存了ByteBuf
,为了叙述方便直接称其保存的为ByteBuf
。另外PoolThreadCache
采用了线程本地变量—FastThreadLocal
,能够进一步提高内存分配效率。
本文就重点介绍Netty是如何通过使用PoolThreadCache
提供内存池分配效率的。
2 相关类介绍
2.1 PoolThreadCache
我们首先看PoolThreadCache
的主要域,在介绍PoolThreadCache
域时需要先了解什么是tiny,small和normal,在Netty中,分类如下:
- tiny: size < 512
- small: size >=512 and size < pageSize
- normal: size >= pageSize and <= chunkSize
- huge: size > chunkSize
PoolThreadCache
只会缓存tiny,small,normal类型的ByteBuf
,不会缓存过大的huge的ByteBuf
,下面看PoolThreadCache
的主要域:
//PoolThreadCache
//因为PoolThreadCache主要持有那些需要被释放的ByteBuf,但是不会
//进行ByteBuf的分配工作,所以这里的heapArena和directArena主要用来
//进行相关缓存的计数,可以自行看下PoolThreadCache的构造函数
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
// Hold the caches for the different size classes, which are tiny, small and normal.
//下面则分别为堆内存、直接内存对应的tiny、small和normal的缓存
//PoolThreadCache采用三个数组保存了每种尺寸的ByteBuf,
//因为每种类型的尺寸都是返回值,比如tiny是size<512,所以
//每种类型的尺寸由根据大小计算下标组成了其对应的数组
//数据中的每个元素为MemoryRegionCache类型,MemoryRegionCache
//其实是一个queue,比如Netty中最小的ByteBuf为16,则tiny缓存数组
//第一个元素MemoryRegionCache队列就保存所有被回收的大小为16的
//ByteBuf信息
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
为了明白上面介绍的数组,我们在看一下从缓存中取对象和用完对象放入缓存操作时从数组中获取MemoryRegionCache的求下标的代码:
//下面函数输入参数normCapacity都是规范为2的指数方的数
//PoolArena
static int tinyIdx(int normCapacity) {
//size<512的是tiny,在这范围内的2的指数次方和对应下标如下:
/*
16: 1
32: 2
64: 4
128: 8
256: 16
*/
//tiny缓存数组大小默认为32
return normCapacity >>> 4;
}
static int smallIdx(int normCapacity) {
//512<=size<pageSize的为small,pageSize按照默认8192算的话
//在这范围内的2的指数次方和对应下标如下:
/* 512: 0
1024: 1
2048: 2
4096: 3
*/
//small缓存数组需要根据pageSize计算,如果pageSize=8192,则为4
int tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
//pageSize<=size<=chunkSize的为small,
//pageSize按照默认8192算的话
//在这范围内的2的指数次方和对应下标如下:
/*
8192: 0
16384: 1
32768: 2
65536: 3
131072: 4
262144: 5
524288: 6
1048576: 7
2097152: 8
4194304: 9
8388608: 10
16777216: 11
*/
//normal缓存数组大小需要根据pageSize和order计算,
//如果pageSize=8192,则为12
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
}
int idx = log2(normCapacity >> numShiftsNormalHeap);
return cache(normalHeapCaches, idx);
}
上面列出了tiny、small、normal缓存数组下标的计算,下面以small缓存数组举例,pageSize=8192时,small数组下标和该下标上MemoryRegionCache
队列缓存的ByteBuf
大小关系如下:
- 512: 0
- 1024: 1
- 2048: 2
- 4096: 3
也就是MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches
下标0上放置的是ByteBuf
大小为512的queue,queue中ByteBuf
个数由PooledByteBufAllocator.DEFAULT_SMALL_CACHE_SIZE
控制,默认512个。回收ByteBuf
时,所有大小为512的ByteBuf
都会被放置到smallSubPageDirectCaches[0]
的队列中。
在文章Netty源码-PooledByteBufAllocator静态变量初始化中,笔者介绍了PooledByteBufAllocator
中静态变量的初始化,其中关于缓存的变量没有介绍,下面介绍这些变量中重要的部分:
//PooledByteBufAllocator
// cache sizes
//下面这些上面提到过,PoolThreadCache中每种缓存
//队列的大小
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
// 'Scalable memory allocation using jemalloc'
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
// the number of threshold of allocations when cached entries will be freed up if not frequently used
//控制如果一个队列中大于该数值的ByteBuf都分配出去了,就清空
//该队列
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);
//这个变量控制是否对所有类型的线程都启用PoolThreadCache,还是只对
//FastThreadLocalThread实例的线程启用PoolThreadCache
DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
"io.netty.allocator.useCacheForAllThreads", true);
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
"io.netty.allocator.directMemoryCacheAlignment", 0);
PoolThreadCache
其实是放在PooledByteBufAllocator
的PoolThreadCache中的(可参考笔者文章Netty源码-FastThreadLocal原理),其相关初始化如下:
//PooledByteBufAllocator
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
@Override
protected synchronized PoolThreadCache initialValue() {
//获取当前分配出去最小容量(也就是空闲容量最多的)head或
//direct Arena
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
//DEFAULT_USE_CACHE_FOR_ALL_THREADS控制的
//是对所有线程都启用PoolThreadCache,还是只对
//FastThreadLocalThread启用
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
//不启用PoolThreadCache则所有类型的cache都为0
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
//这个是缓存释放的关键
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
PoolThreadCache
的构造函数主要创建了tiny/small/nomal数组,比较简单,不再介绍。
上面onRemoval
是清空缓存的关键,需要了解FastThreadLocalz
清理的原理,具体可参考笔者文章Netty源码-FastThreadLocal原理。
2.2 MemoryRegionCache
MemoryRegionCache
是同种大小ByteBuf
(注意这里不是真正的ByteBuf
,只是回收的ByteBuf
的相关信息)的队列,根据为tiny/small或者normal尺寸,有SubPageMemoryRegionCache
和NormalMemoryRegionCache
两种实现,其中NormalMemoryRegionCache
对应nomal的ByteBuf
,SubPageMemoryRegionCache
则对应tiny和small。
MemoryRegionCache
中的重要域如下:
//MemoryRegionCache
//该大小的ByteBuf队列容量
private final int size;
//队列
private final Queue<Entry<T>> queue;
/*
枚举,标识该队列的尺寸类型
enum SizeClass {
Tiny,
Small,
Normal
}
*/
private final SizeClass sizeClass;
MemoryRegionCache
中存放的实际元素为MemoryRegionCache.Entry
类型,
//MemoryRegionCache.Entry
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
//可见Entry只保存了正在回收ByteBuf在chunk中的起始位置
//和对应的chunk
PoolChunk<T> chunk;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
MemoryRegionCache.Entry
中的recyclerHandle
主要是为了实现对象池技术,具体可见笔者文章Netty源码-对象池Recycler。
3 没有缓存时的ByteBuf
分配
为了比较PoolThreadCache
是如何加快分配的,我们先看如果没有PoolThreadCache
是如何分配ByteBuf
的,这里我们简单看下PooledByteBufAllocator
没有缓存时是如何分配直接内存的:
//PooledByteBufAllocator
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//没有启用缓存,只是PoolThreadCache队列尺寸为0,
//但是还是可以使用PoolThreadCache中的arena
//这里就是得到PoolThreadCache中的Arena
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
//进行分配
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
//PoolArena
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
//新建一个ByteBuf对象,这里使用的对象池技术
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
//进行分配
allocate(cache, buf, reqCapacity);
return buf;
}
//下面根据此次分配内存所属尺寸分类tiny/small/normal,先尝试从
//PoolThreadCache中分配,这里因为没有启用缓存,所以尝试会失败,
//尝试失败之后,会新建PoolChunk(如果需要的话),然后采用结合平衡
//二叉树实现的内存分配算法进行运算,得出该ByteBuf在chunk大内存
//中的起始位置,最后分配完毕
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
...
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
可见如果没有启用PoolThreadCache
,在分配ByteBuf
时,需要根据内存分配算法在PoolChunk
中分配相应大小的ByteBuf
。
4 PoolThreadCache
缓存加入、分配、清理
下面在介绍缓存回收和分配时,都默认pageSize=8192,chunkSize=pageSize>>11。
在上面第2节介绍了相关类之后,缓存回收和分配都比较简单了,下面我们首先看ByteBuf
是如何放入PoolThreadCache
中的。
4.1 如何将对象放入缓存
要了解将对象放入缓存,首先要看下PoolChunk
在得出分配的内存其实位置之后,是如何初始化ByteBuf
的:
//PoolChunk
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
//可见,在调用ByteBuf的init方法时
//使用arena.parent.threadCache()传入了
//PoolThreadCache,也就是ByteBuf持有
//PoolThreadCache对象实例
buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
}
我们知道在Netty中,ByteBuf
采用引用计数控制回收,在计数为0时,调用deallocate
方法,PooledByteBuf.deallocate
如下:
//PooledByteBuf
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
recycle();
}
}
//PoolArena
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
//非池化不用分配
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
//加入缓存中
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass);
}
}
//PoolThreadCache
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
//根据回收的ByteBuf大小,从指定数组中获取对应队列下标处的
//MemoryRegionCache
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
//如果为空,表示没有启用缓存,直接返回
if (cache == null) {
return false;
}
//如果启用缓存,则加入缓存中
return cache.add(chunk, handle);
}
//根据ByteBuf大小,从指定数组中获取队列,下标计算上面已经介绍过
//这里不再展开介绍
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
//MemoryRegionCache
//将元素加入到MemoryRegionCache队列中
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = newEntry(chunk, handle);
//这里offer考虑的队列的尺寸,可见第二节的介绍
boolean queued = queue.offer(entry);
//超出队列最大尺寸,则回收entry
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
entry.recycle();
}
return queued;
}
4.2 从缓存中分配
从缓存中分配具体要看下上面已经列出的PoolArena.allocate
方法:
//PoolArena
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
上面的代码很长,但是逻辑并不复杂,我们具体看下如何从缓存中分配tiny尺寸的ByteBuf
,即上面cache.allocateTiny
方法的调用:
//PoolThreadCache
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
//cacheForTiny根据所需ByteBuf大小从tiny缓存数据获取队列
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
//根据所需ByteBuf大小从tiny缓存数据获取队列,这里数据结构和下标
//的运算在第2节介绍过
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
//从队列中取出元素进行ByteBuf初始化
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
//统计allocations,如果已经分配出去的ByteBuf大于
//freeSweepAllocationThreshold,即DEFAULT_CACHE_TRIM_INTERVAL
//配置的数值,则PoolThreadCache维护了allocations 记录了
//整个PoolThreadCache分配的总个数,每个MemoryRegionCache
//也有一个allocations记录了自己分配的个数,如果达到了清理
//条件,则清理队列中size-allocations个ByteBuf
//因为队列已经分配出去了allocations个元素,剩下的元素个数
//就是size-allocations,所以这里就是清空队列
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
//MemoryRegionCache
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
//从队列中取出一个ByteBuf信息,对待分配的ByteBuf进行初始化
//并将对应的entry放入对象池中
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
entry.recycle();
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}
4.3 PoolThreaCache
清理
上面已经介绍过了PoolThreadCache
是放在PooledByteBufAllocator.FastThreadLocal
中的,并且定义了onRemoval
方法,该方法会在FTL主动或被动清理中被调用(关于FTL主动和被动清理,可参考笔者文章Netty源码-FastThreadLocal原理),在onRemoval
方法中调用了threadCache.free()
方法:
//PoolThreadCache
void free() {
//清空所有的缓存数据,具体的不再展开
int numFreed = free(tinySubPageDirectCaches) +
free(smallSubPageDirectCaches) +
free(normalDirectCaches) +
free(tinySubPageHeapCaches) +
free(smallSubPageHeapCaches) +
free(normalHeapCaches);
if (numFreed > 0 && logger.isDebugEnabled()) {
logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, Thread.currentThread().getName());
}
//一些统计信息的变更
if (directArena != null) {
directArena.numThreadCaches.getAndDecrement();
}
if (heapArena != null) {
heapArena.numThreadCaches.getAndDecrement();
}
}