Netty源码分析之ByteBuf(二)

2017-07-05  本文已影响0人  北京的小毛驴

ByteBuf是一个缓冲区,用于和NIO通道进行交互。缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。每当你需要传输数据时,它必须包含一个缓冲区。虽然Java NIO 为我们提供了原生的多种缓冲区实现,但是使用起来相当复杂并且没有经过优化,有着以下缺点:

Netty提供了一个强大的缓冲区ByteBuf,帮助我们解决了以上问题。

一、ByteBuf的读写操作

当需要与远程进行交互时,需要以字节码发送/接收数据。

ByteBuf有2部分:一个用于读,一个用于写。我们可以按顺序的读取数据,并且可以跳到开始重新读一遍。所有的数据操作,我们只需要做的是调整读取数据索引和再次开始读操作。

在对象初始化时,readerIndex和writerIndex的值都是0,随着读写操作的进行,readerIndex和writerIndex都会增加,但是readerIndex不会超过writerIndex。当readerIndex大于0时,0-readerIndex之间的空间会被视为discardable(丢弃的空间),discardable会在调用discardReadBytes之后销毁,同时readerIndex会被重置为0。

*  BEFORE discardReadBytes()  
*      
*      +-------------------+------------------+------------------+
*      | discardable bytes |  readable bytes  |  writable bytes  |
*      +-------------------+------------------+------------------+
*      |                   |                  |                  |
*      0      <=      readerIndex   <=   writerIndex    <=    capacity
*
*  AFTER discardReadBytes()  
*  
*      +------------------+--------------------------------------+  
*      |  readable bytes  |    writable bytes (got more space)   |  
*      +------------------+--------------------------------------+  
*      |                  |                                      |  
* readerIndex (0) <= writerIndex (decreased)        <=        capacity  

readerIndex: 读指针,可读区域是[readerIndex,writerIndex)

writerIndex: 写指针,可写区域是[writerIndex,capacity)

discardable: 丢弃的读空间[0,readerIndex],在调用discardReadBytes后被释放。

二、ByteBuf源码分析

11

2.1 AbstractReferenceCountedByteBuf

自从Netty 4开始,对象的生命周期由它们的引用计数(reference counts)管理,而不是由垃圾收集器(garbage collector)管理了。ByteBuf是最值得注意的,它使用了引用计数来改进分配内存和释放内存的性能。


private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;

private volatile int refCnt = 1;

@Override
public ByteBuf retain() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, 1);
        }
        if (refCnt == Integer.MAX_VALUE) {
            throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
        }
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
            break;
        }
    }
    return this;
}

 
public final boolean release() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, -1);
        }

        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
            if (refCnt == 1) {
                deallocate();
                return true;
            }
            return false;
        }
    }
}

refCntUpdater: refCntUpdater对refCnt进行原子更新

refCnt: 每个对象的初始计数为1,这里利用了voliate内存的可见性和CAS操作来保证它的安全性。

retain(): 可以通过调用retain()增加引用计数,前提是引用计数对象未被销毁

release(): 当你释放(release)引用计数对象时,它的引用计数减1.如果引用计数为1,这个引用计数对象会被释放(deallocate),并返回对象池

deallocate(): 回收ByteBuf

2.2 AbstractByteBuf

static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);

    int readerIndex;
    int writerIndex;
    private int markedReaderIndex;
    private int markedWriterIndex;

    private int maxCapacity;

    private SwappedByteBuf swappedBuf;

    protected AbstractByteBuf(int maxCapacity) {
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
        }
        this.maxCapacity = maxCapacity;
    }

swappedBuf: 大端序列与小端序列的转换。这里有个大小端概念,从网上找了个比较好的例子来解释大小端,C,C++蛮多使用小端的,而我们JAVA默认使用大端。什么意思?比如我要发一个18,两个字节就是0x0012,对于小端模式,先发0x12后发0x00,也就是我们先收到12后收到00,对于java,TCP默认的是大端,即先发高位0x00,后发0x12,netty默认大端,即如果按照大端发送过来的数据,可直接转换成对应数值。

leakDetector: leakDetector是Netty用来解决内存泄漏的检测机制,这里使用了static final,表示所有继承AbstractByteBuf的类都将共享一个内存泄漏管理。

2.3 UnpooledHeapByteBuf

UnpooledHeapByteBuf是一个非线程池实现的在堆内存进行内存分配的字节缓冲区,在每次IO操作的都会去创建一个UnpooledHeapByteBuf对象,如果频繁地对内存进行分配或者释放会对性能造成影响。

private final ByteBufAllocator alloc;
private byte[] array;
private ByteBuffer tmpNioBuf;

public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity); 
        }
        setArray(newArray);
    }
    return this;
}

ByteBufAllocator: 用于内存分配

array: 字节数组作为缓冲区,用于存储字节数据

tmpNioBuf: 用来实现Netty ByteBuf 到Nio ByteBuffer的变换

ensureAccessible: 根据refCnt的值是否为零,判断引用计数对象是否被释放(零是释放)。

capacity:

只要newCapacity!=oldCapacity时,都会创建新的数组作为缓冲区,缓冲区大小是newCapacity。

如果newCapacity大于oldCapacity,调用arraycopy进行内存复制,将旧数据拷贝到新数组中,最后使用setArray进行数组替换。

如果newCapacity小于oldCapacity,首先查看readerIndex是否小于newCapacity。

最后调用setArray更换字节数组.

2.4 UnpooledDirectByteBuf

UnpooledDirectByteBuf是直接缓冲区,JVM不用将数据到堆中,提升了性能。但也有缺点,直接缓冲区在分配内存和释放内存时非常复杂,4.X之后Netty使用内存池解决了这样的问题。

private final ByteBufAllocator alloc;

private ByteBuffer buffer;
private ByteBuffer tmpNioBuf;
private int capacity;
private boolean doNotFree;

@Override
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int readerIndex = readerIndex();
    int writerIndex = writerIndex();

    int oldCapacity = capacity;
    if (newCapacity > oldCapacity) {
       //旧缓冲区存储空间不足时,新建一个缓存区,然后将旧缓存区的数据全部写入到新的缓存区,然后释放旧的缓存区。
        ByteBuffer oldBuffer = buffer;
        ByteBuffer newBuffer = allocateDirect(newCapacity);
        oldBuffer.position(0).limit(oldBuffer.capacity());
        newBuffer.position(0).limit(oldBuffer.capacity());
        newBuffer.put(oldBuffer);
        newBuffer.clear();
        setByteBuffer(newBuffer);
    } else if (newCapacity < oldCapacity) {
        ByteBuffer oldBuffer = buffer;
        ByteBuffer newBuffer = allocateDirect(newCapacity);
        if (readerIndex < newCapacity) {
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            oldBuffer.position(readerIndex).limit(writerIndex);
            newBuffer.position(readerIndex).limit(writerIndex);
            newBuffer.put(oldBuffer);
            newBuffer.clear();
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setByteBuffer(newBuffer);
    }
    return this;
}

private void setByteBuffer(ByteBuffer buffer) {
    ByteBuffer oldBuffer = this.buffer;
    if (oldBuffer != null) {
        if (doNotFree) {
            doNotFree = false;
        } else {
            freeDirect(oldBuffer);
        }
    }

    this.buffer = buffer;
    tmpNioBuf = null;
    capacity = buffer.remaining();
}

allocateDirect: 在堆外创建一个大小为newCapacity的新缓冲区

newCapacity > oldCapacity: 将旧缓冲区的数据全部写入到新的缓存区并且释放旧的缓冲区

newCapacity < oldCapacity : 压缩缓冲区,如果readerIndex > newCapacity,无需将旧的缓存区内容写入到新的缓存区中。否则需要将readerIndex至 Math.min(writerIndex, newCapacity)的内容写入到新的缓存.

position:

limit:

setByteBuffer: 释放旧的缓冲区然后将buffer指向新的缓冲区

2.5 PooledByteBuf

在Netty4之后加入内存池管理PoolChunk,负责管理内存的分配和回收。通过内存池管理比之前的ByteBuf性能要好很多。官方说提供了以下优势:

有篇文章对使用内存池和不使用内存池性能作了分析,大家可以看下:Netty4底层用对象池和不用对象池实践优化

abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
    protected PoolChunk<T> chunk;
}
final class PoolChunk<T> {

    final PoolArena<T> arena;
    private final PoolSubpage<T>[] subpages;
    PoolChunkList<T> parent;
}

PooledByteBuf主要由以下几个部分组成:

2.6 PooledDirectByteBuf

private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
    @Override
    protected PooledDirectByteBuf newObject(Handle handle) {
        return new PooledDirectByteBuf(handle, 0);
    }
};

static PooledDirectByteBuf newInstance(int maxCapacity) {
    PooledDirectByteBuf buf = RECYCLER.get();
    buf.setRefCnt(1);
    buf.maxCapacity(maxCapacity);
    return buf;
}

PooledDirectByteBuf是直接缓冲区,在堆之外直接分配内存。其继承自PooledByteBuf,由于PooledByteBuf是基于内存池实现,所以每次创建字节缓冲区的时候不是直接new,而是从内存池中去获取.

参考书籍:Netty in Action

上一篇下一篇

猜你喜欢

热点阅读