PoolThreadCache

2019-08-01  本文已影响0人  Pillar_Zhong

Netty-PoolThreadCache

概要

PoolThreadCache顾名思义,就是跟线程绑定的cache。

PoolThreadCahche是Netty内存管理中能够实现高效内存申请和释放的一个重要原因,Netty会为每一个线程都维护一个PoolThreadCache对象,当进行内存申请时,首先会尝试从PoolThreadCache中申请,如果无法从中申请到,则会尝试从Netty的公共内存池中申请.

image

关键属性

// 从Alloctor选中的heaparena
final PoolArena<byte[]> heapArena;
// 从Alloctor选中的directarena
final PoolArena<ByteBuffer> directArena;

// tiny/small/normal的caches
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;

private int allocations;

private final Thread thread = Thread.currentThread();
private final Runnable freeTask = new Runnable() {
    @Override
    public void run() {
        free0();
    }
};

初始化

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
   
    // TODO
    this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
    this.heapArena = heapArena;
    this.directArena = directArena;
    // directArena处理
    if (directArena != null) {
        // tinySubPageHeapCaches数组缓存的是[16B, 32B, … , 496B]大小的内存块, 
        // 每个元素对应的缓存queue个数不能超过512个
        tinySubPageDirectCaches = createSubPageCaches(
                tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
        // smallSubPageHeapCaches数组长度为4(如上图所示), 
        // 依次缓存[512K, 1024k, 2048k, 4096k]大小的缓存, 
        // 每个的元素对应的缓存queue个数不能超过256个                
        smallSubPageDirectCaches = createSubPageCaches(
                smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
        
        numShiftsNormalDirect = log2(directArena.pageSize);
        // normalDirectCaches依次缓存[8k, 16k, 32k]大小的缓存
        // 每个的元素对应的缓存queue个数不能超过64个
        normalDirectCaches = createNormalCaches(
                normalCacheSize, maxCachedBufferCapacity, directArena);
        
        // 如果上面初始化完成,那么numThreadCaches加一,代表directArena有新的线程来关联
        directArena.numThreadCaches.getAndIncrement();
    } else {
        // No directArea is configured so just null out all caches
        tinySubPageDirectCaches = null;
        smallSubPageDirectCaches = null;
        normalDirectCaches = null;
        numShiftsNormalDirect = -1;
    }
    // 同direct
    if (heapArena != null) {
        // Create the caches for the heap allocations
        tinySubPageHeapCaches = createSubPageCaches(
                tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
        smallSubPageHeapCaches = createSubPageCaches(
                smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

        numShiftsNormalHeap = log2(heapArena.pageSize);
        normalHeapCaches = createNormalCaches(
                normalCacheSize, maxCachedBufferCapacity, heapArena);

        heapArena.numThreadCaches.getAndIncrement();
    } else {
        // No heapArea is configured so just null out all caches
        tinySubPageHeapCaches = null;
        smallSubPageHeapCaches = null;
        normalHeapCaches = null;
        numShiftsNormalHeap = -1;
    }

    
    ThreadDeathWatcher.watch(thread, freeTask);
}

createNormalCaches

private static <T> MemoryRegionCache<T>[] createNormalCaches(
        int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
    if (cacheSize > 0) {
        // 一个chunk按道理最大能撑到16M,但是如果只是作为线程的本地缓存,16M*64显然不必要,
        //也极大浪费,而这里normal限定为8k,16k,32k就可以了,不至于浪费
        int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
        // 当然了需要3个位置来放置这些长度
        int arraySize = Math.max(1, log2(max / area.pageSize) + 1);

        @SuppressWarnings("unchecked")
        MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
        for (int i = 0; i < cache.length; i++) {
            // NormalMemoryRegionCache代表两个意思,每个不同位置代表不同的normalsize
            // 到时候申请空间,代表是去chunk申请,而不是subpage
            cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
        }
        return cache;
    } else {
        return null;
    }
}

createSubPageCaches

// tiny 32*512 0-512
// small 4*256 512-8192
// 逻辑同normal
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
        int cacheSize, int numCaches, SizeClass sizeClass) {
    if (cacheSize > 0) {
        @SuppressWarnings("unchecked")
        MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
        for (int i = 0; i < cache.length; i++) {
            // TODO: maybe use cacheSize / cache.length
            cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
        }
        return cache;
    } else {
        return null;
    }
}

MemoryRegionCache

概述

1564047989450.png 1564048025752.png

从跟踪queue的add来看,可以得到一个结论。PoolThreadCache并不是自己闷着头将不同规格大小的cache初始化完,对外提供内存。而是去收集上层中不用的内存空间,保存在这里,以便下次复用。如果你不这么做,那么这些内存会被回收掉,也是一种浪费。在高并发的场景,内存的开销会很大。能复用一点再好不过了。

关键属性

// 队列能保存的最大元素个数
private final int size;
// 队列保存了可复用的内存空间
private final Queue<Entry<T>> queue;
// tiny or small or normal
private final SizeClass sizeClass;
// 堆外分配过多少次了
private int allocations;

分配

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    // 如果当前队列没有可用的Entry,那么直接返回false
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    // 如果有的话,直接将buf与该entry进行绑定,这样也达到内存复用的目的
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    entry.recycle();
    
    // 如果成功复用,那么分配次数加一
    ++ allocations;
    return true;
}

清理

public final void trim() {
    int free = size - allocations;
    allocations = 0;

    // We not even allocated all the number that are
    if (free > 0) {
        free(free);
    }
}

定位

这里的目的是根据arena和请求的空间大小来决定我要去拿到哪种规格, 多大的内存空间。

而这些在PoolThreadCache初始化的时候就已经规划好,下面就是具体定位的逻辑。

定位成功的话,拿到对应的MemoryRegionCache,而它里面queue队列中就是你需要的内存。假如可以复用的话。

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    // 拿到normCapacity对应的idex
    int idx = PoolArena.tinyIdx(normCapacity);
    // 去拿到对应的tiny的MemoryRegionCache
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.smallIdx(normCapacity);
    if (area.isDirect()) {
        return cache(smallSubPageDirectCaches, idx);
    }
    // 去拿到对应的small的MemoryRegionCache
    return cache(smallSubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
    if (area.isDirect()) {
        int idx = log2(normCapacity >> numShiftsNormalDirect);
        return cache(normalDirectCaches, idx);
    }
    int idx = log2(normCapacity >> numShiftsNormalHeap);
    // 去拿到对应的normal的MemoryRegionCache
    return cache(normalHeapCaches, idx);
}

private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    // 可以看到,假如你传入的idx不合适的话,说明你申请的空间大小不符合cache的条件
    // 那么返回null,代表cache中没有你需要的空间
    if (cache == null || idx > cache.length - 1) {
        return null;
    }
    return cache[idx];
}

分配

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}

boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
    if (cache == null) {
        // no cache found so just return false here
        return false;
    }
    // 去cache里面看有没有可重用的空间
    boolean allocated = cache.allocate(buf, reqCapacity);
    // 每对外分配了freeSweepAllocationThreshold次数后,做trim
    if (++ allocations >= freeSweepAllocationThreshold) {
        allocations = 0;
        // 清理空间
        trim();
    }
    return allocated;
}

清理

从上面分配可知,当cache复用了一定次数的后,需要对cache里面的空间做一次清理。Netty认为,隔了这么久的时间里面,如果有Entry还没有人认领,那么这些空间可能使用率会指数级递减。与其放着占着内存,不如全部释放掉。

void trim() {
    // 每种规格的cache都需要清理
    trim(tinySubPageDirectCaches);
    trim(smallSubPageDirectCaches);
    trim(normalDirectCaches);
    trim(tinySubPageHeapCaches);
    trim(smallSubPageHeapCaches);
    trim(normalHeapCaches);
}

private static void trim(MemoryRegionCache<?>[] caches) {
    if (caches == null) {
        return;
    }
    for (MemoryRegionCache<?> c: caches) {
        trim(c);
    }
}

private static void trim(MemoryRegionCache<?> cache) {
    if (cache == null) {
        return;
    }
    // 调用MemoryRegionCache的清理
    cache.trim();
}

// MemoryRegionCache
public final void trim() {
    // 计算还能存多少个Entry,也就是free
    int free = size - allocations;
    allocations = 0;

    // We not even allocated all the number that are
    if (free > 0) {
        free(free);
    }
}

private int free(int max) {
    int numFreed = 0;
    // 极端情况下,上一步可以得到free个内存空间等待释放,也就是queue全满的情况
    for (; numFreed < max; numFreed++) {
        // 从队列中取出一个Entry
        Entry<T> entry = queue.poll();
        if (entry != null) {
            freeEntry(entry);
        } else {
            // all cleared
            return numFreed;
        }
    }
    return numFreed;
}

private  void freeEntry(Entry entry) {
    PoolChunk chunk = entry.chunk;
    long handle = entry.handle;

    // entry回收
    entry.recycle();
    
    // 底层chunk去释放
    chunk.arena.freeChunk(chunk, handle, sizeClass);
}

上一篇 下一篇

猜你喜欢

热点阅读