Netty程序员Java 杂谈

自顶向下深入分析Netty(十)--PoolArena

2017-07-24  本文已影响468人  Hypercube

上一节讲述了jemalloc的思想,本节将分析Netty的实现细节。在Netty实现中,相关的类都加上了前缀Pool,比如PoolArenaPoolChunk等,本节分析PoolArena的源码实现细节。

首先看类签名:

    abstract class PoolArena<T> implements PoolArenaMetric

该类是一个抽象类,这是因为ByteBuf分为Heap和Direct,所以PoolArena同样分为两类:Heap和Direct。该类实现的接口PoolArenaMetric是一些信息的测度统计,忽略这些信息不再分析。
其中的关键成员变量如下:

    private final int maxOrder; // chunk相关满二叉树的高度
    final int pageSize; // 单个page的大小
    final int pageShifts; // 用于辅助计算
    final int chunkSize; // chunk的大小
    final int subpageOverflowMask; // 用于判断请求是否为Small/Tiny
    final int numSmallSubpagePools; // small请求的双向链表头个数
    final int directMemoryCacheAlignment; // 对齐基准
    final int directMemoryCacheAlignmentMask; // 用于对齐内存
    private final PoolSubpage<T>[] tinySubpagePools; // Subpage双向链表
    private final PoolSubpage<T>[] smallSubpagePools; // Subpage双向链表
    
    final PooledByteBufAllocator parent;

对于前述分析的如QINIT、Q0等chunk状态,Netty使用PoolChunkList作为容器存放相同状态的Chunk块,相关变量如下:

    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;

构造方法如下:

    protected PoolArena(PooledByteBufAllocator parent, int pageSize,
          int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
        this.parent = parent;
        this.pageSize = pageSize;
        this.maxOrder = maxOrder;
        this.pageShifts = pageShifts;
        this.chunkSize = chunkSize;
        directMemoryCacheAlignment = cacheAlignment;
        directMemoryCacheAlignmentMask = cacheAlignment - 1;
        subpageOverflowMask = ~(pageSize - 1);
        tinySubpagePools = new PoolSubpage[numTinySubpagePools];
        for (int i = 0; i < tinySubpagePools.length; i ++) {
            tinySubpagePools[i] = newSubpagePoolHead(pageSize);
        }

        numSmallSubpagePools = pageShifts - 9;
        smallSubpagePools = new PoolSubpage[numSmallSubpagePools];
        for (int i = 0; i < smallSubpagePools.length; i ++) {
            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
        }
        
        initPoolChunkList();
    }
    
    private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
        PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
        head.prev = head;
        head.next = head;
        return head;
    }

其中initPoolChunkList()如下:

    q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
    q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
    q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
    q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
    q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
    qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);

    q100.prevList(q075);
    q075.prevList(q050);
    q050.prevList(q025);
    q025.prevList(q000);
    q000.prevList(null);
    qInit.prevList(qInit);

这段代码实现如下图所示的双向链表:


状态转移

Netty使用一个枚举来表示每次请求大小的类别:

    enum SizeClass {
        Tiny,
        Small,
        Normal
        // 除此之外的请求为Huge
    }

根据请求分配大小判断所属分类的代码如下,体会其中的位运算:

    // capacity < pageSize
    boolean isTinyOrSmall(int normCapacity) {
        // subpageOverflowMask = ~(pageSize - 1)
        return (normCapacity & subpageOverflowMask) == 0;
    }

    // normCapacity < 512
    static boolean isTiny(int normCapacity) {
        return (normCapacity & 0xFFFFFE00) == 0;
    }
    
    // capacity <= chunkSize
    boolean isNormal(int normCapacity){
        return normCapacity <= chunkSize;
    }

对容量进行规范化的代码如下:

    int normalizeCapacity(int reqCapacity) {
        // Huge 直接返回(直接内存需要对齐)
        if (reqCapacity >= chunkSize) {
            return directMemoryCacheAlignment == 0 ? reqCapacity : 
                              alignCapacity(reqCapacity);
        }

        // Small和Normal 规范化到大于2的n次方的最小值
        if (!isTiny(reqCapacity)) { // >= 512
            int normalizedCapacity = reqCapacity;
            normalizedCapacity --;
            normalizedCapacity |= normalizedCapacity >>>  1;
            normalizedCapacity |= normalizedCapacity >>>  2;
            normalizedCapacity |= normalizedCapacity >>>  4;
            normalizedCapacity |= normalizedCapacity >>>  8;
            normalizedCapacity |= normalizedCapacity >>> 16;
            normalizedCapacity ++;

            if (normalizedCapacity < 0) {
                normalizedCapacity >>>= 1;
            }
            return normalizedCapacity;
        }
        
        // Tiny且直接内存需要对齐
        if (directMemoryCacheAlignment > 0) {
            return alignCapacity(reqCapacity);
        }

        // Tiny且已经是16B的倍数
        if ((reqCapacity & 15) == 0) {
            return reqCapacity;
        }
        
        // Tiny不是16B的倍数则规范化到16B的倍数
        return (reqCapacity & ~15) + 16;
    }

规范化的结果可查看请求分类图,实现中使用了大量位运算,请仔细体会。另外,直接内存对齐后的请求容量为基准的倍数,比如基准为64B,则分配的内存都需要为64B的整数倍,也就是常说的按64字节对齐,实现代码如下(依然使用位运算):

    int alignCapacity(int reqCapacity) {
        // directMemoryCacheAlignmentMask = cacheAlignment - 1;
        int delta = reqCapacity & directMemoryCacheAlignmentMask;
        return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;
    }

对于Small和Tiny的请求,随着请求的分配,PoolArena可能会形成如下的双向循环链表:

Small请求双向链表
其中的每个节点都是PoolSubpage,在jemalloc的介绍中,说明Subpage会以第一次请求分配的大小为基准划分,之后也只能进行这个基准大小的内存分配。在PoolArena中继续对PoolSubpage进行分组,将相同基准的PoolSubpage连接成为双向循环链表,便于管理和内存分配。需要注意的是链表头结点head是一个特殊的PoolSubpage,不进行实际的内存分配任务。得到链表head节点的代码如下:
    PoolSubpage<T> findSubpagePoolHead(int elemSize) {
        int tableIdx;
        PoolSubpage<T>[] table;
        if (isTiny(elemSize)) { // < 512 Tiny
            tableIdx = elemSize >>> 4;
            table = tinySubpagePools;
        } else {    // Small
            tableIdx = 0;
            elemSize >>>= 10;   // 512=0, 1KB=1, 2KB=2, 4KB=3
            while (elemSize != 0) {
                elemSize >>>= 1;
                tableIdx ++;
            }
            table = smallSubpagePools;
        }

        return table[tableIdx];
    }

明白了这些,继续分析重要的内存分配方法allocate():

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, 
                                              final int reqCapacity) {
        // 规范化请求容量
        final int normCapacity = normalizeCapacity(reqCapacity);
        // capacity < pageSize, Tiny/Small请求
        if (isTinyOrSmall(normCapacity)) { 
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512 Tiny请求
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    return; // 尝试从ThreadCache进行分配
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else { // Small请求
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    return; // 尝试从ThreadCache进行分配
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            // 分组的Subpage双向链表的头结点
            final PoolSubpage<T> head = table[tableIdx];

            synchronized (head) {   // 锁定防止其他操作修改head结点
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate(); // 进行分配
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    return;
                }
            }
            
            synchronized (this) {
                // 双向循环链表还没初始化,使用normal分配
                allocateNormal(buf, reqCapacity, normCapacity);
            }
            return;
        }
        
        // Normal请求
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                return; // 尝试从ThreadCache进行分配
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }
        } else {
            // Huge请求直接分配
            allocateHuge(buf, reqCapacity);
        }
    }

对于Normal和Huge的分配细节如下:

    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        if (q050.allocate(buf, reqCapacity, normCapacity) || 
            q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || 
            qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            return;
        }

        // 无Chunk或已存Chunk不能满足分配,新增一个Chunk
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        long handle = c.allocate(normCapacity);
        assert handle > 0;
        c.initBuf(buf, handle, reqCapacity);
        qInit.add(c);   // Chunk初始状态为QINIT
    }
    
    private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
        PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
        buf.initUnpooled(chunk, reqCapacity);
    }

总结一下内存分配过程:

  1. 对于Tiny/Small、Normal大小的请求,优先从线程缓存中分配。
  2. 没有从缓存中得到分配的Tiny/Small请求,会从以第一次请求大小为基准进行分组的Subpage双向链表中进行分配;如果双向链表还没初始化,则会使用Normal请求分配Chunk块中的一个Page,Page以请求大小为基准进行切分并分配第一块内存,然后加入到双向链表中。
  3. 没有从缓存中得到分配的Normal请求,则会使用伙伴算法分配满足要求的连续Page块。
  4. 对于Huge请求,则直接使用Unpooled直接分配。

内存分配过程分析完毕,接着分析内存释放:

    void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
        if (chunk.unpooled) {   // Huge
            int size = chunk.chunkSize();
            destroyChunk(chunk);    // 模板方法,子类实现具体释放过程
        } else {    // Normal, Small/Tiny
            SizeClass sizeClass = sizeClass(normCapacity);
            if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
                return;  // 可以缓存则不释放
            }
            // 否则释放
            freeChunk(chunk, handle, sizeClass);
        }
    }
    
    void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
        final boolean destroyChunk;
        synchronized (this) {
            // parent为所属的chunkList,destroyChunk为true表示Chunk内存使用装填
            // 从QINIT->Q0->...->Q0,最后释放
            destroyChunk = !chunk.parent.free(chunk, handle);
        }
        if (destroyChunk) {
            destroyChunk(chunk); // 模板方法,子类实现具体释放过程
        }
    }

需要注意的是finalize(),该方法是Object中的方法,在对象被GC回收时调用,可知在该方法中需要清理资源,本类中主要清理内存,代码如下:

    protected final void finalize() throws Throwable {
        try {
            super.finalize();
        } finally {
            destroyPoolSubPages(smallSubpagePools); 
            destroyPoolSubPages(tinySubpagePools);
            destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100);
        }
    }

    private static void destroyPoolSubPages(PoolSubpage<?>[] pages) {
        for (PoolSubpage<?> page : pages) {
            page.destroy();
        }
    }

    private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
        for (PoolChunkList<T> chunkList: chunkLists) {
            chunkList.destroy(this);
        }
    }

此外,当PooledByteBuf容量扩增时,内存需要重新分配,代码如下:

    void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
        int oldCapacity = buf.length;
        if (oldCapacity == newCapacity) {
            return;
        }

        PoolChunk<T> oldChunk = buf.chunk;
        long oldHandle = buf.handle;
        T oldMemory = buf.memory;
        int oldOffset = buf.offset;
        int oldMaxLength = buf.maxLength;
        int readerIndex = buf.readerIndex();
        int writerIndex = buf.writerIndex();
        
        // 分配新内存
        allocate(parent.threadCache(), buf, newCapacity);
        // 将老数据copy到新内存
        if (newCapacity > oldCapacity) {
            memoryCopy(oldMemory, oldOffset,
                        buf.memory, buf.offset, oldCapacity);
        } else if (newCapacity < oldCapacity) {
            if (readerIndex < newCapacity) {
                if (writerIndex > newCapacity) {
                    writerIndex = newCapacity;
                }
                memoryCopy(oldMemory, oldOffset + readerIndex,
                            buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
            } else {
                readerIndex = writerIndex = newCapacity;
            }
        }
        
        // 重新设置读写索引
        buf.setIndex(readerIndex, writerIndex);

        // 如有必要,释放老的内存
        if (freeOldMemory) {
            free(oldChunk, oldHandle, oldMaxLength, buf.cache);
        }
    }

最后,由于该类是一个抽象类,其中的抽象方法如下:

    // 新建一个Chunk,Tiny/Small,Normal请求请求分配时调用
    protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
    // 新建一个Chunk,Huge请求分配时调用
    protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
    // 
    protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
    // 复制内存,当ByteBuf扩充容量时调用
    protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
    // 销毁Chunk,释放内存时调用
    protected abstract void destroyChunk(PoolChunk<T> chunk);
    // 判断子类实现Heap还是Direct
    protectted abstract boolean isDirect();

该类的两个子类分别是HeapArenaDirectArena,根据底层不同而实现不同的抽象方法。方法简单易懂,不再列出代码。

相关链接:

  1. JEMalloc分配算法
  2. PoolChunk
  3. PoolChunkList
  4. PoolSubpage
  5. PooThreadCache
上一篇下一篇

猜你喜欢

热点阅读