PooledByteBuf对象、内存复用

2019-06-26  本文已影响0人  横渡

PoolThreadCache: PooledByteBufAllocator 实例维护了一个线程变量。
多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存Chunk。
Pool Chunk里面维护了内存引用,内存复用的做法就是把buf的memory指向Chunck的memory。

我们看下面这段代码

 @Test
    public void poolTest() {
        System.out.println("测试buf回收====");
        ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
        // tiny
        ByteBuf buf1 = allocator.directBuffer(495); // 分配的内存最大长度为496
        System.out.printf("buf1: 0x%X%n", buf1.memoryAddress());
        buf1.release(); // 此时会被回收到tiny 的512b格子中
}

跟踪内存分配的部分
ByteBuf buf1 = allocator.directBuffer(495);
调用 AbstractByteBufAllocator 的directBuffer方法
AbstractByteBufAllocator # directBuffer(int initialCapacity)
因为java平台有unsafe机制,会使用池化的PooledByteBufAllocator:
PooledByteBufAllocator # newDirectBuffer(int initialCapacity, int maxCapacity)

我们来看PooledByteBufAllocator类newDirectBuffer的代码:

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        // PoolThreadCache 跟线程绑定的分配内存的缓存,使用jemaclloc的方式分配内存
        PoolThreadCache cache = threadCache.get();
       // PoolArena 真正的内存分配管理器
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            // poolArena 分配内存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

来看 PoolArena中分配内存的方法:

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        // 首先创建ByteBuf
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
       // 给ByteBuf分配内存
        allocate(cache, buf, reqCapacity);
        return buf;
    }

PoolArena 内部类 DirectArena 的newByteBuf方法:

 @Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
  if (HAS_UNSAFE) { // 是否支持unsafe机制
    return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
  } else {
    return PooledDirectByteBuf.newInstance(maxCapacity);
  }
}

因为支持unsafe机制,走第一个分支PooledUnsafeDirectByteBuf

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        // 尝试从RECYCLER中复用ByteBuf,复用不到再创建ByteBuf对象
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        // 给新分配出来的ByteBuf做一些清理工作,如设置引用计数为1,readerIndex、writeIndex分别设置为0
        buf.reuse(maxCapacity);
        return buf;
    }

先尝试从 RECYCLER 中复用ByteBuf,RECYCLER 是一个基于threadlocal(线程封闭)机制的轻量级别对象池。
我们来看类Recycler中的get()方法:

    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        // 基于线程的Stack
        Stack<T> stack = threadLocal.get(); 
        // DefaultHandle 是对象的句柄,里面的value是真正要复用的对象
        DefaultHandle<T> handle = stack.pop();  // 尝试复用
        if (handle == null) { // 复用不到就创建
            handle = stack.newHandle();
            // 因为我们要创建PooledUnsafeDirectByteBuf,这里value的类型就是 PooledUnsafeDirectByteBuf
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

我们来看PooledByteBuf中的reuse方法:

 final void reuse(int maxCapacity) {
        maxCapacity(maxCapacity);
        // 引用计数设置为1,会调用父类 AbstractReferenceCountedByteBuf 中的方法
        setRefCnt(1);
        // 调用 AbstractByteBuf中的方法设置readerIndex,writerIndex
        setIndex0(0, 0);
        discardMarks();
    }

至此ByteBuf对象已经创建出来了,接下来看如何为ByteBuf分配内存。我们回到PoolArena 的allocate方法:

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        // 为ByteBuf分配内存
        allocate(cache, buf, reqCapacity);
        return buf;
    }

PoolArena # allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)

    // 给buf分配内存空间
    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity); // 将需要的容量转换为16的倍数,不足16的就为16
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize tiny和small的
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512 小于512字节,就从tiny缓存区域中分配
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }

PooledThreadCache中有三块缓存区域:tiny,small,normal。首先会根据申请内存的容量去判断分配哪块区域的内存。
如果在tiny区域分配,会调用PoolThreadCache的allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) 方法:

    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }

PoolThreadCache#cacheForTiny(PoolArena<?> area, int normCapacity)

    private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
        int idx = PoolArena.tinyIdx(normCapacity);
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }

tinySubPageDirectCaches 是一个数组,每个元素都是一个tiny类型的内存块,PoolArena.tinyIdx(normCapacity) 计算出需要使用哪个内存块,然后使用对应的idx取出内存块。

上一篇 下一篇

猜你喜欢

热点阅读