netty程序员Java学习笔记

Netty4(九):ByteBuf 详解

2018-03-19  本文已影响67人  聪明的奇瑞

什么是 ByteBuf ?

ByteBuf 结构

ByteBuf 结构

ByteBuf 注意点

ByteBuf 划分

按底层实现

按是否使用对象池

ByteBuf 创建

// 在堆上分配一个ByteBuf,并指定初始容量和最大容量
public static ByteBuf buffer(int initialCapacity, int maxCapacity) {
    return ALLOC.heapBuffer(initialCapacity, maxCapacity);
}
// 在堆外分配一个ByteBuf,并指定初始容量和最大容量
public static ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    return ALLOC.directBuffer(initialCapacity, maxCapacity);
}
// 使用包装的方式,将一个byte[]包装成一个ByteBuf后返回
public static ByteBuf wrappedBuffer(byte[] array) {
    if (array.length == 0) {
        return EMPTY_BUFFER;
    }
    return new UnpooledHeapByteBuf(ALLOC, array, array.length);
}
// 返回一个组合ByteBuf,并指定组合的个数
public static CompositeByteBuf compositeBuffer(int maxNumComponents){
    return new CompositeByteBuf(ALLOC, false, maxNumComponents);
}
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;

ByteBuf 关键类源码分析

ByteBuf 类结构

ByteBuf

ByteBuf capacity(int newCapacity); // 设置缓冲区容量
ByteBuf order(ByteOrder endianness); // 设置缓冲区字节序
ByteBuf readerIndex(int readerIndex); // 设置缓冲区读索引
ByteBuf writerIndex(int writerIndex); // 设置缓冲区写索引
ByteBuf setIndex(int readerIndex, int writerIndex); // 设置读写索引
ByteBuf markReaderIndex();  // 标记读索引,写索引可类比
ByteBuf resetReaderIndex(); // 重置为标记的读索引
ByteBuf skipBytes(int length); // 略过指定字节(增加读索引)
ByteBuf clear(); // 读写索引都置0
int readableBytes(); // 可读的字节数
boolean isReadable(); // 是否可读
boolean isReadable(int size); // 指定的字节数是否可读
boolean hasArray(); // 判断底层实现是否为字节数组
byte[] array(); // 返回底层实现的字节数组
int arrayOffset();  // 底层字节数组的首字节位置
boolean isDirect(); // 判断底层实现是否为直接缓冲区
boolean hasMemoryAddress(); // 底层直接ByteBuffer是否有内存地址
long memoryAddress(); // 直接ByteBuffer的首字节内存地址
int indexOf(int fromIndex, int toIndex, byte value); // 查找首个特定字节的绝对位置
int bytesBefore(int index, int length, byte value); // 查找首个特定字节的相对位置,相对读索引
ByteBuf copy(); // 拷贝一个缓冲区,copy() 生成的ByteBuf完全独立于原ByteBuf
ByteBuf slice(); // slice() 和 duplicate() 生成的ByteBuf与原ByteBuf共享相同的底层实现,只是各自维护独立的索引和标记,使用这两个方法时,特别需要注意结合使用场景确定是否调用retain()增加引用计数
String toString();  // JAVA中Object的标准重载方法,返回ByteBuf的JAVA描述
String toString(Charset charset);   // 返回使用指定编码集编码的缓冲区字节数据的字符形式

AbstractByteBuf

int readerIndex; // 读索引
int writerIndex; // 写索引
private int markedReaderIndex; // 标记读索引
private int markedWriterIndex; // 标记写索引
private int maxCapacity; // 最大容量
private int calculateNewCapacity(int minNewCapacity) {
    final int maxCapacity = this.maxCapacity;
    final int threshold = 1048576 * 4; // 4MB的阈值

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // 所需的最小容量超过阈值4MB,每次增加4MB
    if (minNewCapacity > threshold) {
        int newCapacity = (minNewCapacity / threshold) * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity; // 超过最大容量不再扩增
        } else {
            newCapacity += threshold; // 增加4MB
        }
        return newCapacity;
    }
    // 此时所需的最小容量小于阈值4MB,容量翻倍
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1; // 使用移位运算表示*2
    }
    return Math.min(newCapacity, maxCapacity);
}
public ByteBuf discardReadBytes() {
    if (readerIndex == 0) {
        return this;
    }

    if (readerIndex != writerIndex) {
        // 将readerIndex之后的数据移动到从0开始
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        writerIndex -= readerIndex; // 写索引减少readerIndex
        adjustMarkers(readerIndex); // 标记索引对应调整
        readerIndex = 0; // 读索引置0
    } else {
        // 读写索引相同时等同于clear操作
        adjustMarkers(readerIndex);
        writerIndex = readerIndex = 0;
    }
    return this;
}
if (readerIndex >= capacity() >>> 1) {
    setBytes(0, this, readerIndex, writerIndex - readerIndex);
    writerIndex -= readerIndex;
    adjustMarkers(readerIndex);
    readerIndex = 0;
}
public ByteBuf skipBytes(int length) {
    checkReadableBytes(length);
    readerIndex += length;
    return this;
}
public int getInt(int index) {
    checkIndex(index, 4);   // 索引正确性检查
    return _getInt(index);
}

protected abstract int _getInt(int index);
public int readInt() {
    checkReadableBytes0(4); // 检查索引
    int v = _getInt(readerIndex);
    readerIndex += 4;   // 读索引增加
    return v;
}

AbstractReferenceCountedByteBuf

CompositeByteBuf

private static final class ComponentList extends ArrayList<Component> {
    ComponentList(int initialCapacity) {
        super(initialCapacity);
    }
    @Override
    public void removeRange(int fromIndex, int toIndex) {
        super.removeRange(fromIndex, toIndex);
    }
}
private static final class Component {
    final ByteBuf buf;
    final int length;
    int offset;
    int endOffset;
    Component(ByteBuf buf) {
        this.buf = buf;
        length = buf.readableBytes();
    }
    void freeIfNecessary() {
        // We should not get a NPE here. If so, it must be a bug.
        buf.release(); 
    }
}
private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
    assert buffer != null;
    boolean wasAdded = false;
    try {
        checkComponentIndex(cIndex);
        int readableBytes = buffer.readableBytes();
        // No need to consolidate - just add a component to the list.
        //将ByteBuf封装成一个Component
        @SuppressWarnings("deprecation")
        Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
        if (cIndex == components.size()) {
            wasAdded = components.add(c);
            if (cIndex == 0) {
                c.endOffset = readableBytes;
            } else {
                Component prev = components.get(cIndex - 1);
                c.offset = prev.endOffset;
                c.endOffset = c.offset + readableBytes;
            }
        } else {
            components.add(cIndex, c);
            wasAdded = true;
            if (readableBytes != 0) {
                updateComponentOffsets(cIndex);
            }
        }
        if (increaseWriterIndex) {
            writerIndex(writerIndex() + buffer.readableBytes());
        }
        return cIndex;
    } finally {
        if (!wasAdded) {
            buffer.release();
        }
    }
}

ByteBuf 池化

public interface ReferenceCounted {
    // 返回当前对象的引用计数值,如果是0则表示当前对象已经被释放了
    int refCnt();
    // 引用计数加1
    ReferenceCounted retain();
    // 引用计数加increment
    ReferenceCounted retain(int increment);
    // 引用计数减1
    boolean release();
    // 引用计数减decrement,如果当前引用计数为0,则释放当前对象,如果释放成功则返回true
    boolean release(int decrement);
}
private boolean release0(int decrement) {
    // AtomicIntegerFieldUpdater类的getAndAdd方法返回的是对象原来的值,然后再进行add操作
    int oldRef = refCntUpdater.getAndAdd(this, -decrement);
    // 如果oldRef==decrement,则说明该对象的引用计数正好被释放完,则可以进行对象的释放操作,也即调用deallocate()方法
    if (oldRef == decrement) {
        deallocate();
        return true;
    // 如果引用计数的原值小于要释放的值,或者decrement小于0,则会抛出引用计数出错的异常IllegalReferenceCountException
    } else if (oldRef < decrement || oldRef - decrement > oldRef) {
        // Ensure we don't over-release, and avoid underflow.
        // 此处会将引用计数的值再增加回来
        refCntUpdater.getAndAdd(this, decrement);
        throw new IllegalReferenceCountException(oldRef, decrement);
    }
    return false;
}
// 引用计数对象的释放方法是一个抽象方法,由各个子类具体实现
protected abstract void deallocate();

引言

上一篇下一篇

猜你喜欢

热点阅读