netty之DirectByteBuf 和 HeapByteBu
ByteBuf是netty中数据传输的容器,用来替代NIO中的ByteBuffer。其主要还是一个byte数组,以及包含了一些数据的操作方法。通过两个指针readerIndex和writerIndex来读写分离,更好的处理数据。其结构大致如下图所示。
ByteBuf示意图而从内存分配的角度来讲,ByteBuf又分为两种,DirectByteBuf和HeapByteBuf。简而言之就是一种是分配在Direct Memory上的,一种是分配在Heap Memory上的。
这里稍微解释一下direct memory。直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,但是这部分内存也被频繁地使用,而且也可能导致异常出现。它是在JDK 1.4 中新加入了NIO(New Input/Output)类,引入了一种基于通道(Channel)与缓冲区(Buffer)的I/O 方式,它可以使用Native 函数库直接分配堆外内存,然通过一个存储在Java 堆里面的DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在Java 堆和Native 堆中来回复制数据。
直接内存的好处就是利用的是native库,读写快速。但是它不在虚拟机的管理范围之内,这部分内存只有在进行full gc时才会进行回收,而他的容量如果没有明确限制,随着数据的不断读写势必造成内存中可利用的空间不断变小。所以netty做了引用计数机制来处理direct memory上的数据。其实对heap上的也做了引用计数。
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
private volatile int refCnt = 1;
其主要用了这两个变量来处理引用计数问题。计数器基于 AtomicIntegerFieldUpdater,为什么不直接用AtomicInteger?因为ByteBuf对象很多,如果都把int包一层AtomicInteger花销较大,而AtomicIntegerFieldUpdater只需要一个全局的静态变量。
retain
@Override
public ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
break;
}
}
return this;
}
release
@Override
public boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
这两种ByteBuf有各自的特点,用于针对不同的场景。
HeapByteBuf :特点是内存的分配和回收速度快。可以被jvm自动回收。缺点就是在进行socket的I/O读写时,需要将堆内存的缓冲区拷贝到内核中,有一定拷贝的代价。
DirectByteBuf:特点是分配在堆内存外,相比于堆内存分配速度会慢一点,并需要手动管理其引用计数,清理不使用的内存。但是其直接用native方法,不需要拷贝。
ByteBuf从内存回收策略上来说也分为两种pool和unpool。其中poolByteBuf主要是建立了一块内存池来管理ByteBuf。其主要为一块poolArena,其中维护这多个poolChunk。其变量大致如下。
bstract class PoolArena<T> implements PoolArenaMetric {
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
enum SizeClass {
Tiny,
Small,
Normal
}
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
private final List<PoolChunkListMetric> chunkListMetrics;