PoolArena

2019-08-01  本文已影响0人  Pillar_Zhong

关键属性

// tiny数组的大小, 32
static final int numTinySubpagePools = 512 >>> 4;
// 所属的PooledByteBufAllocator
final PooledByteBufAllocator parent;
// chunk二叉树的最大高度, 默认11
private final int maxOrder;
// pagesiz, 默认1 << 13 = 8192
final int pageSize;
// pageShifts=log(pageSize),默认13
final int pageShifts;
// 默认16M
final int chunkSize;
// ~(pageSize - 1) = 11111111111111111110000000000000
final int subpageOverflowMask;
// pageShifts - 9 = 4
final int numSmallSubpagePools;
// tiny数组
private final PoolSubpage<T>[] tinySubpagePools;
// small数组
private final PoolSubpage<T>[] smallSubpagePools;

构造方法

image image
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    ...
    // 初始化tinySubpagePools
    // 创建长度为32的PoolSubpage数组
    tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
    for (int i = 0; i < tinySubpagePools.length; i ++) {
        // 给每个元素都初始化一个head
        tinySubpagePools[i] = newSubpagePoolHead(pageSize);
    }

    // 初始化numSmallSubpagePools
    numSmallSubpagePools = pageShifts - 9;
    // 创建长度为4的PoolSubpage数组
    smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
    for (int i = 0; i < smallSubpagePools.length; i ++) {
        // 给每个元素都初始化一个head
        smallSubpagePools[i] = newSubpagePoolHead(pageSize);
    }

    ...chunklist...
}
private PoolSubpage<T>[] newSubpagePoolArray(int size) {
    return new PoolSubpage[size];
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
    PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
    head.prev = head;
    head.next = head;
    return head;
}
  • tinySubpagePools:用于分配小于512字节的内存,默认长度为32,因为内存分配最小为16,每次增加16,直到512,区间[16,512)一共有32个不同值
  • smallSubpagePools:用于分配大于等于512字节的内存,默认长度为4, 每次容量翻倍

定位

既然tinySubpagePools或smallSubpagePools设计成不同elemsize组成的,那么怎么根据我请求的内存大小来去定制具体的head呢?

findSubpagePoolHead

PoolSubpage<T> findSubpagePoolHead(int elemSize) {
    int tableIdx;
    PoolSubpage<T>[] table;
    // 如果申请的空间是属于tiny的部分
    if (isTiny(elemSize)) { // < 512
        // 根据elemsize除去16可以很容易拿到tinySubpagePools对应大小的head
        tableIdx = elemSize >>> 4;
        table = tinySubpagePools;
    } else {
        // 否则去smallSubpagePools里面申请
        tableIdx = 0;
        // 这里很巧妙的来计算small的对应elemsize的head
        // 512-1023之间的数字>>>10的结果为0
        // 1024-2047之间的数字>>>10的结果为1
        // 2048-4095之间的数字>>>10的结果为10
        // 4096-8195之间的数字>>>10的结果为100
        // 将以上的结果右移一位直到等于0,所用的次数正好是数组的下标
        elemSize >>>= 10;
        while (elemSize != 0) {
            elemSize >>>= 1;
            tableIdx ++;
        }
        table = smallSubpagePools;
    }

    return table[tableIdx];
}

分配

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // 对申请的内存大小做标准化
    final int normCapacity = normalizeCapacity(reqCapacity);
    // 如果小于pagesize,8k
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        // 如果小于512,属于tiny的范围
        if (tiny) { // < 512
            // 去本地缓存申请tiny空间,如果能成功,直接返回
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            // 拿到tinySubpagePools中该容量所属的位置
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        // 否则属于small的范围
        } else {
            // 去本地缓存申请small空间,如果能成功,直接返回
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            // 拿到smallSubpagePools中该容量所属的位置
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        // 拿到对应的head
        final PoolSubpage<T> head = table[tableIdx];

        // head同步处理,
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            // head的next有PoolSubpage存在,说明之前已经分配过内存了,看看能不能复用这段内存
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                // 申请空间,拿到内存地址handle
                long handle = s.allocate();
                // 申请成功
                assert handle >= 0;
                // 将这段申请的内存空间与ByteBuf绑定
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {
                    allocationsTiny.increment();
                } else {
                    allocationsSmall.increment();
                }
                return;
            }
        }
        // 到这里,说明没有缓存可用,那么去内存池去获取
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    // 如果超过page但还没有到chunksize,那么直接新建chunk
    if (normCapacity <= chunkSize) {
        // 去本地缓存申请normal空间
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        // 直接去内存池去获取
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        // Huge allocations are never served via the cache so just call allocateHuge
        // 直接去申请Unpooled内存空间
        allocateHuge(buf, reqCapacity);
    }
}

allocateNormal

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    // 在poolchunklist中看有没有chunk能进行分配,当然了,刚开始,这些里面都是没有的。
    // 只有在实际生成chunk后才会加到这些list里面。
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        ++allocationsNormal;
        return;
    }
    
    // 新增chunk
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    // 在chunk里申请内存空间
    long handle = c.allocate(normCapacity);
    ++allocationsNormal;
    // 如果申请成功
    assert handle > 0;
    // 与Bytebuf绑定
    c.initBuf(buf, handle, reqCapacity);
    // 将该chunk加到qInit的list。
    qInit.add(c);
}

标准化

int normalizeCapacity(int reqCapacity) {
    if (reqCapacity < 0) {
        throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
    }
    // 如果超过chunksize,直接返回
    if (reqCapacity >= chunkSize) {
        return reqCapacity;
    }
    
    // 因为整个chunk只有tiny是16B递增的。其他都是翻倍递增,也就是以2为幂的数
    // 只有当超过tiny,也就是>=512, 才需要计算
    if (!isTiny(reqCapacity)) { // >= 512
        // Doubled

        // 这里来计算离给定reqCapacity最近的下一个2的幂次方
        // 比如给定8888,会返回16K
        int normalizedCapacity = reqCapacity;
        normalizedCapacity --;
        normalizedCapacity |= normalizedCapacity >>>  1;
        normalizedCapacity |= normalizedCapacity >>>  2;
        normalizedCapacity |= normalizedCapacity >>>  4;
        normalizedCapacity |= normalizedCapacity >>>  8;
        normalizedCapacity |= normalizedCapacity >>> 16;
        normalizedCapacity ++;

        if (normalizedCapacity < 0) {
            normalizedCapacity >>>= 1;
        }

        return normalizedCapacity;
    }

    // Quantum-spaced
    // 到这里说明是tiny的范围,如果已经是16的倍数了,那么满足条件,直接返回。
    if ((reqCapacity & 15) == 0) {
        return reqCapacity;
    }
    // 否则标准化成16的倍数
    return (reqCapacity & ~15) + 16;
}

重分配

void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
    if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = buf.length;
    if (oldCapacity == newCapacity) {
        return;
    }

    PoolChunk<T> oldChunk = buf.chunk;
    long oldHandle = buf.handle;
    T oldMemory = buf.memory;
    int oldOffset = buf.offset;
    int oldMaxLength = buf.maxLength;
    int readerIndex = buf.readerIndex();
    int writerIndex = buf.writerIndex();

    allocate(parent.threadCache(), buf, newCapacity);
    // 如果新的容量比之前的大,那么将之前内存的内容复制到新的内存空间里面
    if (newCapacity > oldCapacity) {
        memoryCopy(
                oldMemory, oldOffset,
                buf.memory, buf.offset, oldCapacity);
    // 如果是缩容的操作,且能放下部分read的数据,那么将read的数据复制过去
    // 当新容量不足时,允许截取
    } else if (newCapacity < oldCapacity) {
        if (readerIndex < newCapacity) {
            if (writerIndex > newCapacity) {
                writerIndex = newCapacity;
            }
            memoryCopy(
                    oldMemory, oldOffset + readerIndex,
                    buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
        } else {
            readerIndex = writerIndex = newCapacity;
        }
    }

    buf.setIndex(readerIndex, writerIndex);
    
    // 释放老的内存空间
    if (freeOldMemory) {
        free(oldChunk, oldHandle, oldMaxLength, buf.cache);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读