netty alloc 初始化过程

2020-03-20  本文已影响0人  田才

内存分配是 netty 源码学习中的难点,也是 netty 高性能的关键之一。可以说这是 netty 的精髓。以下记录本人阅读源码的过程,因为能力有限,请读者抱着怀疑的态度观看,有不准确的地方还望指正。本文基于 netty 4.1.46 ,以 PooledByteBufAllocator 、Direct 、Unsafe 为讲解主线。

世间万物皆有始,下边让我们从一个 test 开始

    public static void main(String[] args) {
        int _1b = 1;
        int _1k = 1024 * _1b;
        PooledByteBufAllocator allocator = new PooledByteBufAllocator(true);
        //申请 1b 内存, 此处 netty 会分配一个 16b 的连续的 byte,因为第一次分配, netty 向操作系统一次性申请 16M 内存
        ByteBuf buffer = allocator.buffer(_1b);
        //这里写不下 netty 会自动扩容
        buffer.writeBytes("hello 田32143243242432432才".getBytes());
        System.out.println(buffer.toString(Charset.forName("utf-8")));
        //回收内存
        buffer.release();
        //再次申请 1b 内存,这里会复用上次申请到的 16b 内存
        buffer = allocator.buffer(_1b);
        //回收内存
        buffer.release();
        //分配一页内存,分配策略和分配 1b 内存完全不一样
        buffer = allocator.buffer(8*_1k);
    }

问题:

带着以上几个问题,正式开始学习

1、alloc 的 uml 图

image.png

上图中相关类的用途:

有了初步认识后,开始源码阅读。在源码阅读过程中可能还会出现更多的问题。能提出问题就是进步。

2、PooledByteBufAllocator 的初始化

(1)、获取一些可配置的参数,如果没有配置那么用默认值,这里展示一些重要的参数,篇幅原因其他部分省略了。

static {
      //默认 8k 的页
      int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
      DEFAULT_PAGE_SIZE = defaultPageSize;

      //分配规则的 完全二叉树 的层级
      int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
      DEFAULT_MAX_ORDER = defaultMaxOrder;
      //Chunk 的大小 默认为 16M = 8<<11
      final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;

      final Runtime runtime = Runtime.getRuntime();

      final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
      
      //最大堆内内存分配工具Arena 数量 
      DEFAULT_NUM_HEAP_ARENA = Math.max(0,
              SystemPropertyUtil.getInt(
                      "io.netty.allocator.numHeapArenas",
                      (int) Math.min(
                              defaultMinNumArena,
                              runtime.maxMemory() / defaultChunkSize / 2 / 3)));
      //最大堆外内存分配工具Arena 数量
      DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
              SystemPropertyUtil.getInt(
                      "io.netty.allocator.numDirectArenas",
                      (int) Math.min(
                              defaultMinNumArena,
                              PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

      //不同规格内存类型的缓冲数量
      //0 - 512b
      DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
      //512b - 8k
      DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
      //8k - 16m
      DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
}

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
      //设置是否是 directBuffer 或 byte[] ,类型的内存分配
      super(preferDirect);
      //非常重要的 线程本地变量,内存分配都基于此引用
      threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
      this.tinyCacheSize = tinyCacheSize;
      this.smallCacheSize = smallCacheSize;
      this.normalCacheSize = normalCacheSize;
      //pageSize 是每页的大小,maxOrder 是数的高度,因为树的叶子节点为 page
      //所以个 chunkSize = pageSize * (1<<maxOrder)
      // 默认 chunkSize 16M  maxOrder 11层  pageSize 8k
      chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

      checkPositiveOrZero(nHeapArena, "nHeapArena");
      checkPositiveOrZero(nDirectArena, "nDirectArena");

      checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
      if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
          throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
      }

      if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
          throw new IllegalArgumentException("directMemoryCacheAlignment: "
                  + directMemoryCacheAlignment + " (expected: power of two)");
      }
      //计算 1 位移多少位数等于 pageSize
      int pageShifts = validateAndCalculatePageShifts(pageSize);

      //堆内内存分配工具创建
      if (nHeapArena > 0) {
          heapArenas = newArenaArray(nHeapArena);
          List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
          for (int i = 0; i < heapArenas.length; i ++) {
              PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                      pageSize, maxOrder, pageShifts, chunkSize,
                      directMemoryCacheAlignment);
              heapArenas[i] = arena;
              metrics.add(arena);
          }
          heapArenaMetrics = Collections.unmodifiableList(metrics);
      } else {
          heapArenas = null;
          heapArenaMetrics = Collections.emptyList();
      }

      //堆外内分配工具的创建
      if (nDirectArena > 0) {
          // nDirectArena 默认数量为 2 * cpu
          // 因为在分配 EventLoop 的数量为 2 * cpu 所以此处就是希望,给每个线程分配一个 DirectArena,不用加锁
          directArenas = newArenaArray(nDirectArena);
          List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
          for (int i = 0; i < directArenas.length; i ++) {
              //创建内存区域 Arena
              PoolArena.DirectArena arena = new PoolArena.DirectArena(
                      this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
              directArenas[i] = arena;
              metrics.add(arena);
          }
          directArenaMetrics = Collections.unmodifiableList(metrics);
      } else {
          directArenas = null;
          directArenaMetrics = Collections.emptyList();
      }
      metric = new PooledByteBufAllocatorMetric(this);
  }

看初始化的代码可以提出几个问题:
1、为啥 chunk 的大小默认为 16M
所有应用向操作系统OS内存申请是以 chunk (16m)为单位的。后续所有的操作都是在chunk中操作的。例如 要分配1m的内存,需要向操作系统申请一个chunk (16m),然后在chunk (16m)中取1m的内存。
2、为啥分配工具的数量为 2 倍逻辑cpu数量
因为在分配 EventLoop 的数量默认为 2 * cpu 所以此处就是希望,给每个线程分配一个 Arena,减少锁的竞争。
注意此处创建了 new PoolThreadLocalCache(useCacheForAllThreads); 分配内存时要从 PoolThreadLocalCache 对象中拿到一个线程本地副本变量。

然后看一下 Arena 的初始化方法,初始化了一些数据结构,为内存分配,和使用率汇总做准备。

    //内存规格
    enum SizeClass {
        Tiny,
        Small,
        Normal
    }
   protected PoolArena(PooledByteBufAllocator parent, int pageSize,
          int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
        this.parent = parent;
        //默认 8k = 8192
        this.pageSize = pageSize;
        //默认 11 因为 8*1024*(1<<11) = 16M = chunkSize
        this.maxOrder = maxOrder;
        //因为 pageSize 是2的整数倍数,所以 pageShifts 是1位移的位数,移动完成即可得到 pageSize
        this.pageShifts = pageShifts;
        //默认 16M = 16777216
        this.chunkSize = chunkSize;
        directMemoryCacheAlignment = cacheAlignment;
        directMemoryCacheAlignmentMask = cacheAlignment - 1;
        subpageOverflowMask = ~(pageSize - 1);
        //数组长度默认 32
        tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
        for (int i = 0; i < tinySubpagePools.length; i ++) {
            //和下文的 MemoryRegionCache 算法差不多, PoolSubpage[1] = 16b  PoolSubpage[2] = 32b  .... PoolSubpage[31] = 512B
            //在 Chunk 中的 allocateSubpage 创建 子page 时填充
            tinySubpagePools[i] = newSubpagePoolHead(pageSize);
        }

        numSmallSubpagePools = pageShifts - 9;
        //数组长度默认 4
        smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
        for (int i = 0; i < smallSubpagePools.length; i ++) {
            //PoolSubpage[1] = 512  PoolSubpage[2] = 1k  .... PoolSubpage[3] = 4k
            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
        }
        //创建按照Chnuk使用率分组的集合类
        q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
        q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
        q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
        q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
        q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
        qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);

        //通过双向链表方式进行连接
        q100.prevList(q075);
        q075.prevList(q050);
        q050.prevList(q025);
        q025.prevList(q000);
        q000.prevList(null);
        qInit.prevList(qInit);

        List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
        metrics.add(qInit);
        metrics.add(q000);
        metrics.add(q025);
        metrics.add(q050);
        metrics.add(q075);
        metrics.add(q100);
        chunkListMetrics = Collections.unmodifiableList(metrics);
    }

注意:

smallSubpagePools 和 tinySubpagePools 用于,按照规格存储 page 拆分后的内存碎片的,也称为"子page"。
PoolChunkList : 用于 Chnuk 使用率分组的集合类。便于统计,和避免不必要的空间检测。如果调用了PooledByteBufAllocator.buffer (1) ,之后结构如图

image.png

3、PooledByteBufAllocator.buffer 获取一段连续的内存

@Override
    public ByteBuf buffer(int initialCapacity) {
        if (directByDefault) {
            return directBuffer(initialCapacity);
        }
        return heapBuffer(initialCapacity);
    }

directByDefault 变量就是初始化时候用户传入的参数,用来区分是否创建java.nio.ByteBuffer 对象。这边以 directBuffer() 方法为例

@Override
    public ByteBuf directBuffer(int initialCapacity) {
        return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
    }

DEFAULT_MAX_CAPACITY 是netty ByteBuff 最大可扩容的上界,默认为
Integer.MAX_VALUE。

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        //拿到线程局部缓冲 PoolThreadCache
        PoolThreadCache cache = threadCache.get();
        //拿到堆外预分配内存
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

这里用到了初始化的时候创建的 PoolThreadLocalCache 对象,如果是第一次调用 threadCache.get(); 那么会初始化 PoolThreadCache ,否则直接拿到第一次创建的对象。

    final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
        //默认 useCacheForAllThreads 为 true
        private final boolean useCacheForAllThreads;

        @Override
        protected synchronized PoolThreadCache initialValue() {
            //netty 的 内存分配工具池中的 区域都是从 heapArenas 或 directArenas 中拿到的
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

            final Thread current = Thread.currentThread();
            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
                final PoolThreadCache cache = new PoolThreadCache(
                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);

                if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
                    final EventExecutor executor = ThreadExecutorMap.currentExecutor();
                    if (executor != null) {
                        executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
                                DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
                    }
                }
                return cache;
            }
            // No caching so just use 0 as sizes.
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
        }

其中 leastUsedArena 方法是从directArenas 数组中拿到一个最少被使用过的 directArena。
然后看一下初始化 PoolThreadCache,

    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
        //开辟内存工具,netty 内存池的实现
        this.heapArena = heapArena;
        this.directArena = directArena;
        if (directArena != null) {
            //创建重复利用内存的 缓冲容器,如果 ByteBuf.release() 方法那么会填充此缓冲区
            //创建 MemoryRegionCache 数组,数组大小和 PoolArena.tinySubpagePools 数量一致
            //tinySubPageHeapCaches[1] =16B  tinySubPageHeapCaches[2] = 32B ... 496B
            tinySubPageDirectCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            //smallSubPageHeapCaches[1] = 512B  smallSubPageHeapCaches[2] = 1K ... 4K
            smallSubPageDirectCaches = createSubPageCaches(
                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalDirect = log2(directArena.pageSize);
            //normalHeapCaches[1] = 8K  smallSubPageHeapCaches[2] = 16K smallSubPageHeapCaches[3] = 24K
            normalDirectCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, directArena);

            directArena.numThreadCaches.getAndIncrement();
        } else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            // Create the caches for the heap allocations
            tinySubPageHeapCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageHeapCaches = createSubPageCaches(
                    smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalHeap = log2(heapArena.pageSize);
            normalHeapCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, heapArena);

            heapArena.numThreadCaches.getAndIncrement();
        } else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }

        // Only check if there are caches in use.
        if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                && freeSweepAllocationThreshold < 1) {
            throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                    + freeSweepAllocationThreshold + " (expected: > 0)");
        }
    }

结构如图:


image.png

PoolThreadLocalCache 构造方法主要是创建了 MemoryRegionCache 他的作用是,重复利用内存的缓冲容器,如果 ByteBuf.release() 方法那么会填充此缓冲区。

   private abstract static class MemoryRegionCache<T> {
        // tiny  N*16B
        // small 512B 1K 2K 4K
        // normal 8k 16k 32k
        // 如果 32k 以上的节点是不进行缓存的
        private final int size;
        //存放ByteBuf ,Queue 里边的所有的元素大小固定,都等于 size
        private final Queue<Entry<T>> queue;
        //内存规格  tiny  small normal
        private final SizeClass sizeClass;
        private int allocations;

        MemoryRegionCache(int size, SizeClass sizeClass) {
            //规格化,查找一个大于size并且是2指数 ,这里配置的已经是规格化了,只是为了保险
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
            //创建持有这种规格的缓存队列
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
        }

        static final class Entry<T> {
            final Handle<Entry<?>> recyclerHandle;
            //分配单位
            PoolChunk<T> chunk;
            ByteBuffer nioBuffer;
            //指向一个连续区域的内存 , 通过 handle 可以定位 PoolChunk 中的一块连续的内存
            long handle = -1;

            Entry(Handle<Entry<?>> recyclerHandle) {
                this.recyclerHandle = recyclerHandle;
            }

            void recycle() {
                //表示 Entry 不指向任何一块内存
                chunk = null;
                nioBuffer = null;
                handle = -1;
                recyclerHandle.recycle(this);
            }
        }

至此初始化话过程已经完成

上一篇 下一篇

猜你喜欢

热点阅读