netty源码分析(六) - ByteBuf - 2Unpool
2020-10-14 本文已影响0人
进击的蚂蚁zzzliu
UnpooledByteBufAllocator
先来分析下DEFAULT,及构造方法
UnpooledByteBufAllocator:
//通过static final创建一个非Unpooled内存池
public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
PlatformDependent:
public static boolean directBufferPreferred() {
return DIRECT_BUFFER_PREFERRED;
}
// We should always prefer direct buffers by default if we can use a Cleaner to release direct buffers.
//如果我们可以使用Cleaner释放直接缓冲区,则默认情况下,我们总是应该首选直接缓冲区
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
//android不支持Direct方式
if (!isAndroid()) {
if (javaVersion() >= 9) {
CLEANER = CleanerJava9.isSupported() ? new CleanerJava9() : NOOP;
} else {
CLEANER = CleanerJava6.isSupported() ? new CleanerJava6() : NOOP;
}
} else {
CLEANER = NOOP;
}
CleanerJava6:
//以jdk8为例
static boolean isSupported() {
//CLEANER_FIELD_OFFSET:cleaner字段偏移量;CLEANER_FIELD:是否包含cleaner字段
return CLEANER_FIELD_OFFSET != -1 || CLEANER_FIELD != null;
}
//默认是jdk的DirectByteBuffer,包含cleaner字段
final ByteBuffer direct = ByteBuffer.allocateDirect(1);
- 总结一下就是非android平台默认PlatformDependent.directBufferPreferred()为true;即preferDirect为true
- 最终
directByDefault = preferDirect && PlatformDependent.hasUnsafe();
:为true,及默认会创建Direct类型ByteBuf
1. newHeapBuffer
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
- InstrumentedUnpooledUnsafeHeapByteBuf:UnpooledUnsafeHeapByteBuf子类
- InstrumentedUnpooledHeapByteBuf:UnpooledHeapByteBuf子类
1.1. UnpooledHeapByteBuf
先来看成员变量:
//分配器
private final ByteBufAllocator alloc;
//内存分配都基于该字节数组
byte[] array;
//NIO的ByteBuffer
private ByteBuffer tmpNioBuf;
再看构造方法(方便查看进行了手动内联):
public UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
//校验maxCapacity是否合法
checkPositiveOrZero(maxCapacity, "maxCapacity");
this.maxCapacity = maxCapacity;
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = checkNotNull(alloc, "alloc");
//创建初始数组
array = new byte[initialCapacity];
tmpNioBuf = null;
//readIndex/writeIndex初始为0
setIndex(0, 0);
}
很简单就是在堆内创建一个数组作为字节容器,设置readIndex/writeIndex为0
再看下重写的_setxxx/_getxxx,以int为例
protected void _setByte(int index, int value) {
HeapByteBufUtil.setByte(array, index, value);
}
static void setByte(byte[] memory, int index, int value) {
memory[index] = (byte) value;
}
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}
static byte getByte(byte[] memory, int index) {
return memory[index];
}
同样很简单,就是数组操作
再看下设置容量方法
public ByteBuf capacity(int newCapacity) {
int oldCapacity = array.length;
byte[] oldArray = array;
if (newCapacity > oldCapacity) { // 容量扩增
byte[] newArray = allocateArray(newCapacity); // 申请数组
// 将老数组的字节复制到新数组
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
setArray(newArray);
freeArray(oldArray);
} else if (newCapacity < oldCapacity) { // 容量缩减
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
// 容量缩减导致读写索引改变
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
// 只拷贝读索引之后的数据,读索引之前0填充
System.arraycopy(oldArray, readerIndex,
newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
// 容量相等时不做处理
return this;
}
还是很简单就是创建新数组,然后copy,然后free原数组
1.2. UnpooledUnsafeHeapByteBuf
继承自UnpooledHeapByteBuf,成员变量和构造方法都一致,同样基于数组,不同之处在于重写了UnpooledHeapByteBuf中对数组操作的相关方法,改为使用UnsafeByteBufUtil调用Unsafe进行操作
看下常用方法
protected byte[] allocateArray(int initialCapacity) {
return PlatformDependent.allocateUninitializedArray(initialCapacity);
}
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}
protected void _setByte(int index, int value) {
UnsafeByteBufUtil.setByte(array, index, value);
}
2. newDirectBuffer
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
//noCleaner默认为true:即没有自动垃圾清理,需要手动回收
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
//当前平台不支持unsafe会分配非unsafe
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
- InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf:UnpooledUnsafeDirectByteBuf子类
- InstrumentedUnpooledDirectByteBuf:UnpooledDirectByteBuf子类
2.1. UnpooledDirectByteBuf
先来看成员变量:
//分配器
private final ByteBufAllocator alloc;
//底层NIO直接ByteBuffer
ByteBuffer buffer; // accessed by UnpooledUnsafeNoCleanerDirectByteBuf.reallocateDirect()
//用于IO操作的ByteBuffer,实现实质是buffer.duplicate()即与buffer共享底层数据结构
private ByteBuffer tmpNioBuf;
//ByteBuf容量
private int capacity;
//释放标记,是否需要释放buffer的底层内存
private boolean doNotFree;
再看构造方法(方便查看进行了手动内联):
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
this.maxCapacity = maxCapacity;
ObjectUtil.checkNotNull(alloc, "alloc");
checkPositiveOrZero(initialCapacity, "initialCapacity");
checkPositiveOrZero(maxCapacity, "maxCapacity");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
//分配NIO的DirectByteBuffer
setByteBuffer(new DirectByteBuffer(capacity), false);
}
void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
//默认false,不尝试回收原buffer,原buffer什么时候不为null???扩容时为true
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
}
this.buffer = buffer;
tmpNioBuf = null;
//初始值为initialCapacity
capacity = buffer.remaining();
}
protected void freeDirect(ByteBuffer buffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
protected ByteBuffer allocateDirect(int initialCapacity) {
return ByteBuffer.allocateDirect(initialCapacity);
}
- 创建一个NIO的DirectByteBuffer,后续操作基本都委托给它进行处理
再看下重写的_setxxx/_getxxx,以int为例
protected int _getInt(int index) {
return buffer.getInt(index);
}
protected void _setInt(int index, int value) {
buffer.putInt(index, value);
}
so easy
再看下设置容量方法
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
int oldCapacity = capacity;
if (newCapacity == oldCapacity) {
return this;
}
int bytesToCopy;
if (newCapacity > oldCapacity) {
bytesToCopy = oldCapacity;
} else {
trimIndicesToCapacity(newCapacity);
bytesToCopy = newCapacity;
}
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity);
oldBuffer.position(0).limit(bytesToCopy);
newBuffer.position(0).limit(bytesToCopy);
newBuffer.put(oldBuffer).clear();
setByteBuffer(newBuffer, true);
return this;
}
就是创建新的NIO ByteBuffer把原ByteBuffer中字节copy过来,然后需要注意下setByteBuffer(newBuffer, true);
,tryFree为true会释放原ByteBuf
2.2. UnpooledUnsafeDirectByteBuf
继承自UnpooledDirectByteBuf,增加了一个成员变量(memoryAddress:内存首地址),同样基于NIO的DirectByteBuffer,不同之处在于重写了UnpooledDirectByteBuf中对字节操作的相关方法,改为使用UnsafeByteBufUtil调用Unsafe进行操作
memoryAddress为分配的ByteBuffer首地址
//内存首地址
long memoryAddress;
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
super.setByteBuffer(buffer, tryFree);
memoryAddress = PlatformDependent.directBufferAddress(buffer);
}
常用方法
protected void _setByte(int index, int value) {
UnsafeByteBufUtil.setByte(addr(index), value);
}
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
final long addr(int index) {
return memoryAddress + index;
}
至此UnpooledByteBufAllocator分析完毕,PooledByteBufAllocator留到下一节吧