ByteBuf

2019-08-01  本文已影响0人  Pillar_Zhong

体系结构

1563803296721.png

从图可以看出来,ByteBuf分类主要是三个维度:

  • Unpooled和Pooled
  • Unsafe和非Unsafe
  • Heap和Direct

访问索引

从图示中可以容易理解ByteBuf的读写区域

1563801077089.png

内存分配

策略

1563805578302.png 1563805646415.png 1563805753773.png

可以看到在接口方法里面已经区分了是direct还是heap,在继承体系上区分了是Pooled还是Unpooled

而在具体的逻辑中,再根据系统底层是否支持Unsafe来决定是否要返回Unsafe的实现。

UnpooledByteBufAllocator

heap内存的分配

1563806190062.png
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    // 根据是否支持Unsafe与否来返回具体的UnpooledHeapByteBuf
    return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
            : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

初始化

UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    // 直接去调用父类UnpooledHeapByteBuf的构造函数
    // 说明底层的内存分配是一样的,只不过数据操作的方式不一样,会用Unsafe的方式,更快,更直接。
    super(alloc, initialCapacity, maxCapacity);
}

protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    // 直接新建byte数组,长度为initialCapacity,readindex和writeindex为0
    this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
}

private UnpooledHeapByteBuf(
        ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {

    super(maxCapacity);

    ...

    this.alloc = alloc;
    // 保存上面的byte数组作为ByteBuf的存储
    setArray(initialArray);
    // read和write index归零
    setIndex(readerIndex, writerIndex);
}

getByte

Unsafe
// Unsafe的方式操作数据
protected byte _getByte(int index) {
    return UnsafeByteBufUtil.getByte(array, index);
}

static byte getByte(byte[] data, int index) {
    // 可以看到,这里直接通过byte数组的内存偏移量加上数组的index来获取数据
    return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
非Unsafe
// 非Unsafe的方式操作数据
protected byte _getByte(int index) {
    return HeapByteBufUtil.getByte(array, index);
}

// 通过数组下标的方式来操作数组
static byte getByte(byte[] memory, int index) {
    return memory[index];
}

direct内存分配

1563807731297.png
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // 根据底层对Unsafe的支持与否来决定最终的UnpooledDirectByteBuf
    ByteBuf buf = PlatformDependent.hasUnsafe() ?
            UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
            new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);

    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

初始化

static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(
        ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    if (PlatformDependent.useDirectBufferNoCleaner()) {
        return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
    }
    return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
非Unsafe
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    ...

    this.alloc = alloc;
    // NIO的方法,这里直接去堆外分配内存
    setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}

private void setByteBuffer(ByteBuffer buffer) {
    ByteBuffer oldBuffer = this.buffer;
    // 而这里对老的buffer进行回收,TODO
    if (oldBuffer != null) {
        if (doNotFree) {
            doNotFree = false;
        } else {
            freeDirect(oldBuffer);
        }
    }
    // 将前面创建的堆外内存保存下来
    this.buffer = buffer;
    tmpNioBuf = null;
    capacity = buffer.remaining();
}
Unsafe
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    ...

    this.alloc = alloc;
    // 直接堆外内存分配
    setByteBuffer(allocateDirect(initialCapacity), false);
}

// 跟非Unsafe类似
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
    if (tryFree) {
        ByteBuffer oldBuffer = this.buffer;
        if (oldBuffer != null) {
            if (doNotFree) {
                doNotFree = false;
            } else {
                freeDirect(oldBuffer);
            }
        }
    }
    this.buffer = buffer;
    // Unsafe不一样的地方在于,需要知道buffer的内存偏移量,而这个偏移量在allocateDirect会生成
    memoryAddress = PlatformDependent.directBufferAddress(buffer);
    tmpNioBuf = null;
    capacity = buffer.remaining();
}

static long directBufferAddress(ByteBuffer buffer) {
    // 可以看到这个偏移量会去找ADDRESS_FIELD_OFFSET
    // final Field field = Buffer.class.getDeclaredField("address");
    // 而堆外内存的偏移量早在allocateDirect的时候就已经自动赋给buffer的address了,直接反射去拿就好
    // 这个address只代表堆外内存的偏移量,
    return getLong(buffer, ADDRESS_FIELD_OFFSET);
}

getByte

Unsafe
protected byte _getByte(int index) {
    return UnsafeByteBufUtil.getByte(addr(index));
}

long addr(int index) {
    // 根据上面保存下来的偏移量来去拿到index的地址
    return memoryAddress + index;
}
非Unsafe
protected byte _getByte(int index) {
    // 直接通过api来对buffer进行读取
    return buffer.get(index);
}

PooledByteBufAllocator

direct内存分配

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // 拿到当前线程绑定的PoolThreadCache
    PoolThreadCache cache = threadCache.get();
    // 进而拿到PoolThreadCache的堆外内存
    PoolArena<ByteBuffer> directArena = cache.directArena;

    ByteBuf buf;
    if (directArena != null) {
        // 在当前线程绑定的堆外内存区域请求一块空间
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }

    return toLeakAwareBuffer(buf);
}
上一篇下一篇

猜你喜欢

热点阅读