netty源码分析(六) - ByteBuf - 2Unpool

2020-10-14  本文已影响0人  进击的蚂蚁zzzliu

UnpooledByteBufAllocator

先来分析下DEFAULT,及构造方法

UnpooledByteBufAllocator:
//通过static final创建一个非Unpooled内存池
public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());

PlatformDependent:
public static boolean directBufferPreferred() {
    return DIRECT_BUFFER_PREFERRED;
}
// We should always prefer direct buffers by default if we can use a Cleaner to release direct buffers.
//如果我们可以使用Cleaner释放直接缓冲区,则默认情况下,我们总是应该首选直接缓冲区
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
//android不支持Direct方式
if (!isAndroid()) {
    if (javaVersion() >= 9) {
        CLEANER = CleanerJava9.isSupported() ? new CleanerJava9() : NOOP;
    } else {
        CLEANER = CleanerJava6.isSupported() ? new CleanerJava6() : NOOP;
    }
} else {
    CLEANER = NOOP;
}

CleanerJava6:
//以jdk8为例
static boolean isSupported() {
    //CLEANER_FIELD_OFFSET:cleaner字段偏移量;CLEANER_FIELD:是否包含cleaner字段
    return CLEANER_FIELD_OFFSET != -1 || CLEANER_FIELD != null;
}
//默认是jdk的DirectByteBuffer,包含cleaner字段
final ByteBuffer direct = ByteBuffer.allocateDirect(1);

1. newHeapBuffer

protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
            new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

1.1. UnpooledHeapByteBuf

先来看成员变量:

//分配器
private final ByteBufAllocator alloc;
//内存分配都基于该字节数组
byte[] array;
//NIO的ByteBuffer
private ByteBuffer tmpNioBuf;

再看构造方法(方便查看进行了手动内联):

public UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    //校验maxCapacity是否合法
    checkPositiveOrZero(maxCapacity, "maxCapacity");
    this.maxCapacity = maxCapacity;
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }
    this.alloc = checkNotNull(alloc, "alloc");
    //创建初始数组
    array = new byte[initialCapacity];
    tmpNioBuf = null;
    //readIndex/writeIndex初始为0
    setIndex(0, 0);
}

很简单就是在堆内创建一个数组作为字节容器,设置readIndex/writeIndex为0


再看下重写的_setxxx/_getxxx,以int为例

protected void _setByte(int index, int value) {
    HeapByteBufUtil.setByte(array, index, value);
}

static void setByte(byte[] memory, int index, int value) {
    memory[index] = (byte) value;
}
protected byte _getByte(int index) {
    return HeapByteBufUtil.getByte(array, index);
}

static byte getByte(byte[] memory, int index) {
    return memory[index];
}

同样很简单,就是数组操作


再看下设置容量方法

public ByteBuf capacity(int newCapacity) {
    int oldCapacity = array.length;
    byte[] oldArray = array;
    if (newCapacity > oldCapacity) {    // 容量扩增
        byte[] newArray = allocateArray(newCapacity); // 申请数组
        // 将老数组的字节复制到新数组
        System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
        setArray(newArray);
        freeArray(oldArray);
    } else if (newCapacity < oldCapacity) { // 容量缩减
        byte[] newArray = allocateArray(newCapacity);
        int readerIndex = readerIndex();
        // 容量缩减导致读写索引改变
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            // 只拷贝读索引之后的数据,读索引之前0填充
            System.arraycopy(oldArray, readerIndex, 
                             newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
        freeArray(oldArray);
    }
    // 容量相等时不做处理
    return this;
}

还是很简单就是创建新数组,然后copy,然后free原数组

1.2. UnpooledUnsafeHeapByteBuf

继承自UnpooledHeapByteBuf,成员变量和构造方法都一致,同样基于数组,不同之处在于重写了UnpooledHeapByteBuf中对数组操作的相关方法,改为使用UnsafeByteBufUtil调用Unsafe进行操作

看下常用方法

protected byte[] allocateArray(int initialCapacity) {
    return PlatformDependent.allocateUninitializedArray(initialCapacity);
}

protected byte _getByte(int index) {
    return UnsafeByteBufUtil.getByte(array, index);
}

protected void _setByte(int index, int value) {
    UnsafeByteBufUtil.setByte(array, index, value);
}

2. newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    final ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        //noCleaner默认为true:即没有自动垃圾清理,需要手动回收
        buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        //当前平台不支持unsafe会分配非unsafe
        buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

2.1. UnpooledDirectByteBuf

先来看成员变量:

//分配器
private final ByteBufAllocator alloc;
//底层NIO直接ByteBuffer
ByteBuffer buffer; // accessed by UnpooledUnsafeNoCleanerDirectByteBuf.reallocateDirect()
//用于IO操作的ByteBuffer,实现实质是buffer.duplicate()即与buffer共享底层数据结构
private ByteBuffer tmpNioBuf;
//ByteBuf容量
private int capacity;
//释放标记,是否需要释放buffer的底层内存
private boolean doNotFree;

再看构造方法(方便查看进行了手动内联):

public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    this.maxCapacity = maxCapacity;
    ObjectUtil.checkNotNull(alloc, "alloc");
    checkPositiveOrZero(initialCapacity, "initialCapacity");
    checkPositiveOrZero(maxCapacity, "maxCapacity");
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }
    this.alloc = alloc;
    //分配NIO的DirectByteBuffer
    setByteBuffer(new DirectByteBuffer(capacity), false);
}

void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
    //默认false,不尝试回收原buffer,原buffer什么时候不为null???扩容时为true
    if (tryFree) {
        ByteBuffer oldBuffer = this.buffer;
        if (oldBuffer != null) {
            if (doNotFree) {
                doNotFree = false;
            } else {
                freeDirect(oldBuffer);
            }
        }
    }
    this.buffer = buffer;
    tmpNioBuf = null;
    //初始值为initialCapacity
    capacity = buffer.remaining();
}

protected void freeDirect(ByteBuffer buffer) {
    PlatformDependent.freeDirectBuffer(buffer);
}

protected ByteBuffer allocateDirect(int initialCapacity) {
    return ByteBuffer.allocateDirect(initialCapacity);
}

再看下重写的_setxxx/_getxxx,以int为例

protected int _getInt(int index) {
    return buffer.getInt(index);
}

protected void _setInt(int index, int value) {
    buffer.putInt(index, value);
}

so easy

再看下设置容量方法

public ByteBuf capacity(int newCapacity) {
    checkNewCapacity(newCapacity);
    int oldCapacity = capacity;
    if (newCapacity == oldCapacity) {
        return this;
    }
    int bytesToCopy;
    if (newCapacity > oldCapacity) {
        bytesToCopy = oldCapacity;
    } else {
        trimIndicesToCapacity(newCapacity);
        bytesToCopy = newCapacity;
    }
    ByteBuffer oldBuffer = buffer;
    ByteBuffer newBuffer = allocateDirect(newCapacity);
    oldBuffer.position(0).limit(bytesToCopy);
    newBuffer.position(0).limit(bytesToCopy);
    newBuffer.put(oldBuffer).clear();
    setByteBuffer(newBuffer, true);
    return this;
}

就是创建新的NIO ByteBuffer把原ByteBuffer中字节copy过来,然后需要注意下setByteBuffer(newBuffer, true);,tryFree为true会释放原ByteBuf

2.2. UnpooledUnsafeDirectByteBuf

继承自UnpooledDirectByteBuf,增加了一个成员变量(memoryAddress:内存首地址),同样基于NIO的DirectByteBuffer,不同之处在于重写了UnpooledDirectByteBuf中对字节操作的相关方法,改为使用UnsafeByteBufUtil调用Unsafe进行操作

memoryAddress为分配的ByteBuffer首地址

//内存首地址
long memoryAddress;

final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
    super.setByteBuffer(buffer, tryFree);
    memoryAddress = PlatformDependent.directBufferAddress(buffer);
}

常用方法

protected void _setByte(int index, int value) {
    UnsafeByteBufUtil.setByte(addr(index), value);
}

protected byte _getByte(int index) {
    return UnsafeByteBufUtil.getByte(addr(index));
}

final long addr(int index) {
    return memoryAddress + index;
}

至此UnpooledByteBufAllocator分析完毕,PooledByteBufAllocator留到下一节吧

上一篇 下一篇

猜你喜欢

热点阅读