深入浅出Netty源码剖析

Netty源码-PoolThreadCache

2019-05-31  本文已影响4人  persisting_

1 概述

为了优化内存分配和回收,Netty使用了内存池机制分配ByteBuf,其实在分配ByteBuf时,Netty不仅使用了内存池技术,还采用了对象池技术(本文暂且将PoolThreadCache分配称为一种对象池技术)。我们知道Netty内存池是预先分配一大块内存,然后在这一大块预分配的内存中划分实际所需的各种大小的ByteBuf,采用内存池分配出来的ByteBuf都是在这一大块内存上进行数据存取,这些ByteBuf实际上就是记录了自己在这一大块内存上的起始位置、划分给自己的大小(范围)。

如果没有采用PoolThreadCache,则每次分配新的ByteBuf都需要使用分配算法在预分配的一大块内存上进行分配,效率比较低,采用了PoolThreadCache则可以在ByteBuf使用完回收时将使用完的ByteBuf记录到PoolThreadCache中,下次在需要分配同大小(同范围)的ByteBuf可直接进行分配,准确的说PoolThreadCache缓存的并不是ByteBuf,而是待释放ByteBuf的chunk、自己的起始位置、最大可访问长度等信息,所以上面说PoolThreadCache‘暂且’是一种对象池技术,下面在介绍PoolThreadCache也不再具体区分其保存的是ByteBuf的相关信息还是直接保存了ByteBuf,为了叙述方便直接称其保存的为ByteBuf。另外PoolThreadCache采用了线程本地变量—FastThreadLocal,能够进一步提高内存分配效率。

本文就重点介绍Netty是如何通过使用PoolThreadCache提供内存池分配效率的。

2 相关类介绍

2.1 PoolThreadCache

我们首先看PoolThreadCache的主要域,在介绍PoolThreadCache域时需要先了解什么是tiny,small和normal,在Netty中,分类如下:

PoolThreadCache只会缓存tiny,small,normal类型的ByteBuf,不会缓存过大的huge的ByteBuf,下面看PoolThreadCache的主要域:

//PoolThreadCache
//因为PoolThreadCache主要持有那些需要被释放的ByteBuf,但是不会
//进行ByteBuf的分配工作,所以这里的heapArena和directArena主要用来
//进行相关缓存的计数,可以自行看下PoolThreadCache的构造函数
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;

// Hold the caches for the different size classes, which are tiny, small and normal.
//下面则分别为堆内存、直接内存对应的tiny、small和normal的缓存
//PoolThreadCache采用三个数组保存了每种尺寸的ByteBuf,
//因为每种类型的尺寸都是返回值,比如tiny是size<512,所以
//每种类型的尺寸由根据大小计算下标组成了其对应的数组
//数据中的每个元素为MemoryRegionCache类型,MemoryRegionCache
//其实是一个queue,比如Netty中最小的ByteBuf为16,则tiny缓存数组
//第一个元素MemoryRegionCache队列就保存所有被回收的大小为16的
//ByteBuf信息
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

为了明白上面介绍的数组,我们在看一下从缓存中取对象和用完对象放入缓存操作时从数组中获取MemoryRegionCache的求下标的代码:

//下面函数输入参数normCapacity都是规范为2的指数方的数

//PoolArena
static int tinyIdx(int normCapacity) {
    //size<512的是tiny,在这范围内的2的指数次方和对应下标如下:
    /*
    16: 1
    32: 2
    64: 4
    128: 8
    256: 16
    */
    //tiny缓存数组大小默认为32
    return normCapacity >>> 4;
}

static int smallIdx(int normCapacity) {
    //512<=size<pageSize的为small,pageSize按照默认8192算的话
    //在这范围内的2的指数次方和对应下标如下:
    /* 512: 0
    1024: 1
    2048: 2
    4096: 3
    */
    //small缓存数组需要根据pageSize计算,如果pageSize=8192,则为4
    int tableIdx = 0;
    int i = normCapacity >>> 10;
    while (i != 0) {
        i >>>= 1;
        tableIdx ++;
    }
    return tableIdx;
}

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
    if (area.isDirect()) {
        //pageSize<=size<=chunkSize的为small,
        //pageSize按照默认8192算的话
        //在这范围内的2的指数次方和对应下标如下:
        /*
        8192: 0
        16384: 1
        32768: 2
        65536: 3
        131072: 4
        262144: 5
        524288: 6
        1048576: 7
        2097152: 8
        4194304: 9
        8388608: 10
        16777216: 11
        */
        //normal缓存数组大小需要根据pageSize和order计算,
        //如果pageSize=8192,则为12
        int idx = log2(normCapacity >> numShiftsNormalDirect);
        return cache(normalDirectCaches, idx);
    }
    int idx = log2(normCapacity >> numShiftsNormalHeap);
    return cache(normalHeapCaches, idx);
}

上面列出了tiny、small、normal缓存数组下标的计算,下面以small缓存数组举例,pageSize=8192时,small数组下标和该下标上MemoryRegionCache队列缓存的ByteBuf大小关系如下:

在文章Netty源码-PooledByteBufAllocator静态变量初始化中,笔者介绍了PooledByteBufAllocator中静态变量的初始化,其中关于缓存的变量没有介绍,下面介绍这些变量中重要的部分:

//PooledByteBufAllocator
// cache sizes
//下面这些上面提到过,PoolThreadCache中每种缓存
//队列的大小
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);

// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
// 'Scalable memory allocation using jemalloc'
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
        "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);

// the number of threshold of allocations when cached entries will be freed up if not frequently used
//控制如果一个队列中大于该数值的ByteBuf都分配出去了,就清空
//该队列
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
        "io.netty.allocator.cacheTrimInterval", 8192);
//这个变量控制是否对所有类型的线程都启用PoolThreadCache,还是只对
//FastThreadLocalThread实例的线程启用PoolThreadCache
DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
        "io.netty.allocator.useCacheForAllThreads", true);

DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
        "io.netty.allocator.directMemoryCacheAlignment", 0);

PoolThreadCache其实是放在PooledByteBufAllocator的PoolThreadCache中的(可参考笔者文章Netty源码-FastThreadLocal原理),其相关初始化如下:

//PooledByteBufAllocator
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
    private final boolean useCacheForAllThreads;

    PoolThreadLocalCache(boolean useCacheForAllThreads) {
        this.useCacheForAllThreads = useCacheForAllThreads;
    }

    @Override
    protected synchronized PoolThreadCache initialValue() {
        //获取当前分配出去最小容量(也就是空闲容量最多的)head或
        //direct Arena
        final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
        final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

        Thread current = Thread.currentThread();
        //DEFAULT_USE_CACHE_FOR_ALL_THREADS控制的
        //是对所有线程都启用PoolThreadCache,还是只对
        //FastThreadLocalThread启用
        if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
            return new PoolThreadCache(
                    heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                    DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
        }
        //不启用PoolThreadCache则所有类型的cache都为0
        // No caching so just use 0 as sizes.
        return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
    }

    //这个是缓存释放的关键
    @Override
    protected void onRemoval(PoolThreadCache threadCache) {
        threadCache.free();
    }

    private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
        if (arenas == null || arenas.length == 0) {
            return null;
        }

        PoolArena<T> minArena = arenas[0];
        for (int i = 1; i < arenas.length; i++) {
            PoolArena<T> arena = arenas[i];
            if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                minArena = arena;
            }
        }

        return minArena;
    }
}

PoolThreadCache的构造函数主要创建了tiny/small/nomal数组,比较简单,不再介绍。

上面onRemoval是清空缓存的关键,需要了解FastThreadLocalz清理的原理,具体可参考笔者文章Netty源码-FastThreadLocal原理

2.2 MemoryRegionCache

MemoryRegionCache是同种大小ByteBuf(注意这里不是真正的ByteBuf,只是回收的ByteBuf的相关信息)的队列,根据为tiny/small或者normal尺寸,有SubPageMemoryRegionCacheNormalMemoryRegionCache两种实现,其中NormalMemoryRegionCache对应nomal的ByteBufSubPageMemoryRegionCache则对应tiny和small。
MemoryRegionCache中的重要域如下:

//MemoryRegionCache
//该大小的ByteBuf队列容量
private final int size;
//队列
private final Queue<Entry<T>> queue;
/*
枚举,标识该队列的尺寸类型
enum SizeClass {
    Tiny,
    Small,
    Normal
}
*/
private final SizeClass sizeClass;

MemoryRegionCache中存放的实际元素为MemoryRegionCache.Entry类型,

//MemoryRegionCache.Entry
static final class Entry<T> {
    final Handle<Entry<?>> recyclerHandle;
    //可见Entry只保存了正在回收ByteBuf在chunk中的起始位置
    //和对应的chunk
    PoolChunk<T> chunk;
    long handle = -1;

    Entry(Handle<Entry<?>> recyclerHandle) {
        this.recyclerHandle = recyclerHandle;
    }

    void recycle() {
        chunk = null;
        handle = -1;
        recyclerHandle.recycle(this);
    }
}

MemoryRegionCache.Entry中的recyclerHandle主要是为了实现对象池技术,具体可见笔者文章Netty源码-对象池Recycler

3 没有缓存时的ByteBuf分配

为了比较PoolThreadCache是如何加快分配的,我们先看如果没有PoolThreadCache是如何分配ByteBuf的,这里我们简单看下PooledByteBufAllocator没有缓存时是如何分配直接内存的:

//PooledByteBufAllocator
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    //没有启用缓存,只是PoolThreadCache队列尺寸为0,
    //但是还是可以使用PoolThreadCache中的arena
    //这里就是得到PoolThreadCache中的Arena
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        //进行分配
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
                UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

//PoolArena
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    //新建一个ByteBuf对象,这里使用的对象池技术
    PooledByteBuf<T> buf = newByteBuf(maxCapacity);
    //进行分配
    allocate(cache, buf, reqCapacity);
    return buf;
}

//下面根据此次分配内存所属尺寸分类tiny/small/normal,先尝试从
//PoolThreadCache中分配,这里因为没有启用缓存,所以尝试会失败,
//尝试失败之后,会新建PoolChunk(如果需要的话),然后采用结合平衡
//二叉树实现的内存分配算法进行运算,得出该ByteBuf在chunk大内存
//中的起始位置,最后分配完毕
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        final PoolSubpage<T> head = table[tableIdx];

        ...
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
        }

        incTinySmallAllocation(tiny);
        return;
    }
    if (normCapacity <= chunkSize) {
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
            ++allocationsNormal;
        }
    } else {
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, reqCapacity);
    }
}

可见如果没有启用PoolThreadCache,在分配ByteBuf时,需要根据内存分配算法在PoolChunk中分配相应大小的ByteBuf

4 PoolThreadCache缓存加入、分配、清理

下面在介绍缓存回收和分配时,都默认pageSize=8192,chunkSize=pageSize>>11。

在上面第2节介绍了相关类之后,缓存回收和分配都比较简单了,下面我们首先看ByteBuf是如何放入PoolThreadCache中的。

4.1 如何将对象放入缓存

要了解将对象放入缓存,首先要看下PoolChunk在得出分配的内存其实位置之后,是如何初始化ByteBuf的:

//PoolChunk
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);
    if (bitmapIdx == 0) {
        byte val = value(memoryMapIdx);
        assert val == unusable : String.valueOf(val);
        //可见,在调用ByteBuf的init方法时
        //使用arena.parent.threadCache()传入了
        //PoolThreadCache,也就是ByteBuf持有
        //PoolThreadCache对象实例
        buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
                    arena.parent.threadCache());
    } else {
        initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
    }
}

我们知道在Netty中,ByteBuf采用引用计数控制回收,在计数为0时,调用deallocate方法,PooledByteBuf.deallocate如下:

//PooledByteBuf
@Override
protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        tmpNioBuf = null;
        chunk.arena.free(chunk, handle, maxLength, cache);
        chunk = null;
        recycle();
    }
}

//PoolArena
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
    //非池化不用分配
    if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        SizeClass sizeClass = sizeClass(normCapacity);
        //加入缓存中
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }

        freeChunk(chunk, handle, sizeClass);
    }
}

//PoolThreadCache
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
    //根据回收的ByteBuf大小,从指定数组中获取对应队列下标处的
    //MemoryRegionCache
    MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
    //如果为空,表示没有启用缓存,直接返回
    if (cache == null) {
        return false;
    }
    //如果启用缓存,则加入缓存中
    return cache.add(chunk, handle);
}

//根据ByteBuf大小,从指定数组中获取队列,下标计算上面已经介绍过
//这里不再展开介绍
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
    switch (sizeClass) {
    case Normal:
        return cacheForNormal(area, normCapacity);
    case Small:
        return cacheForSmall(area, normCapacity);
    case Tiny:
        return cacheForTiny(area, normCapacity);
    default:
        throw new Error();
    }
}

//MemoryRegionCache
//将元素加入到MemoryRegionCache队列中
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, long handle) {
    Entry<T> entry = newEntry(chunk, handle);
    //这里offer考虑的队列的尺寸,可见第二节的介绍
    boolean queued = queue.offer(entry);
    //超出队列最大尺寸,则回收entry
    if (!queued) {
        // If it was not possible to cache the chunk, immediately recycle the entry
        entry.recycle();
    }

    return queued;
}

4.2 从缓存中分配

从缓存中分配具体要看下上面已经列出的PoolArena.allocate方法:

//PoolArena
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        final PoolSubpage<T> head = table[tableIdx];

        /**
            * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
            * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
            */
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                incTinySmallAllocation(tiny);
                return;
            }
        }
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
        }

        incTinySmallAllocation(tiny);
        return;
    }
    if (normCapacity <= chunkSize) {
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        synchronized (this) {
            allocateNormal(buf, reqCapacity, normCapacity);
            ++allocationsNormal;
        }
    } else {
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, reqCapacity);
    }
}

上面的代码很长,但是逻辑并不复杂,我们具体看下如何从缓存中分配tiny尺寸的ByteBuf,即上面cache.allocateTiny方法的调用:

//PoolThreadCache
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    //cacheForTiny根据所需ByteBuf大小从tiny缓存数据获取队列
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

//根据所需ByteBuf大小从tiny缓存数据获取队列,这里数据结构和下标
//的运算在第2节介绍过
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}

//从队列中取出元素进行ByteBuf初始化
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
    if (cache == null) {
        // no cache found so just return false here
        return false;
    }
    boolean allocated = cache.allocate(buf, reqCapacity);
    //统计allocations,如果已经分配出去的ByteBuf大于
    //freeSweepAllocationThreshold,即DEFAULT_CACHE_TRIM_INTERVAL
    //配置的数值,则PoolThreadCache维护了allocations 记录了
    //整个PoolThreadCache分配的总个数,每个MemoryRegionCache
    //也有一个allocations记录了自己分配的个数,如果达到了清理
    //条件,则清理队列中size-allocations个ByteBuf
    //因为队列已经分配出去了allocations个元素,剩下的元素个数
    //就是size-allocations,所以这里就是清空队列
    if (++ allocations >= freeSweepAllocationThreshold) {
        allocations = 0;
        trim();
    }
    return allocated;
}

//MemoryRegionCache
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    //从队列中取出一个ByteBuf信息,对待分配的ByteBuf进行初始化
    //并将对应的entry放入对象池中
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    entry.recycle();

    // allocations is not thread-safe which is fine as this is only called from the same thread all time.
    ++ allocations;
    return true;
}

4.3 PoolThreaCache清理

上面已经介绍过了PoolThreadCache是放在PooledByteBufAllocator.FastThreadLocal中的,并且定义了onRemoval方法,该方法会在FTL主动或被动清理中被调用(关于FTL主动和被动清理,可参考笔者文章Netty源码-FastThreadLocal原理),在onRemoval方法中调用了threadCache.free()方法:

//PoolThreadCache
void free() {
    //清空所有的缓存数据,具体的不再展开
    int numFreed = free(tinySubPageDirectCaches) +
            free(smallSubPageDirectCaches) +
            free(normalDirectCaches) +
            free(tinySubPageHeapCaches) +
            free(smallSubPageHeapCaches) +
            free(normalHeapCaches);

    if (numFreed > 0 && logger.isDebugEnabled()) {
        logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, Thread.currentThread().getName());
    }
    //一些统计信息的变更
    if (directArena != null) {
        directArena.numThreadCaches.getAndDecrement();
    }

    if (heapArena != null) {
        heapArena.numThreadCaches.getAndDecrement();
    }
}
上一篇下一篇

猜你喜欢

热点阅读