netty

Netty 源码分析之ByteBuf

2018-10-18  本文已影响0人  达微

ByteBuf基础

Java Nio 的Buffer

在进行数据传输的过程中,我们经常会用到缓冲区。
在Java NIO 为我们提供了原生的七种缓冲区实现,对应着Java 的七种基本类型。一般使用ByteBuffer较多。原生的Buffer虽然能满足我们的日常使用,但是要进行复杂的应用的时候,确有点力不从心了,原生Buffer存在着以下缺点。因此Netty对其进行了封装,提供了更为友好的接口供我们使用。

ByteBuf工作原理

ByteBuf也是通过字节数组作为缓冲区来存取数据,通过外观模式聚合了JDK NIO元素的ByteBuffer,进行封装。
ByteBuf是通过readerIndex跟writerIndex两个位置指针来协助缓冲区的读写操作的。
在对象初始化的时候,readerIndex和writerIndex的值为0,随着读操作和写操作的进行,writerIndex和readerIndex都会增加,不过readerIndex不能超过writerIndex,在进行读取操作之后,0到readerIndex之间的空间会被视为discard,调用ByteBuf的discardReadBytes方法,可以对这部分空间进行释放重用,类似于ByteBuffer的compact操作,对缓冲区进行压缩。readerIndex到writerIndex的空间,相当于ByteBuffer的position到limit的空间,可以对其进行读取,WriterIndex到capacity的空间,则相当于ByteBuffer的limit到capacity的空间,是可以继续写入的。
readerIndex跟writerIndex让读写操作的位置指针分离,不需要对同一个位置指针进行调整,简化了缓冲区的读写操作。
同样,ByteBuf对读写操作进行了封装,提供了动态扩展的能力,当我们对缓冲区进行写操作的时候,需要对剩余的可用空间进行校验,如果可用空间不足,同时要写入的字节数小于可写的最大字节数,会对缓冲区进行动态扩展,它会重新创建一个缓冲区,然后将以前的数据复制到新创建的缓冲区中,

ByteBuf基本功能

public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
    checkReadableBytes(length);
    getBytes(readerIndex, dst, dstIndex, length);
    readerIndex += length;
    return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
    ensureAccessible();
    if (minimumReadableBytes < 0) {
        throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
    }
    if (readerIndex > writerIndex - minimumReadableBytes) {
        throw new IndexOutOfBoundsException(String.format(
                "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                readerIndex, minimumReadableBytes, writerIndex, this));
    }
}
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    writerIndex += length;
    return this;
}
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }

    if (minWritableBytes <= writableBytes()) {
        return this;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);

    // Adjust to the new capacity.
    capacity(newCapacity);
    return this;
}
private int calculateNewCapacity(int minNewCapacity) {
    final int maxCapacity = this.maxCapacity;
    final int threshold = 1048576 * 4; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}
//UnpooledHeapByteBuf的capacity实现
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}
public ByteBuf clear() {
    readerIndex = writerIndex = 0;
    return this;
}
public ByteBuf discardReadBytes() {
    ensureAccessible();
    if (readerIndex == 0) {
        return this;
    }

    if (readerIndex != writerIndex) {
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        writerIndex -= readerIndex;
        adjustMarkers(readerIndex);
        readerIndex = 0;
    } else {
        adjustMarkers(readerIndex);
        writerIndex = readerIndex = 0;
    }
    return this;
}
protected final void adjustMarkers(int decrement) {
    int markedReaderIndex = this.markedReaderIndex;
    if (markedReaderIndex <= decrement) {
        this.markedReaderIndex = 0;
        int markedWriterIndex = this.markedWriterIndex;
        if (markedWriterIndex <= decrement) {
            this.markedWriterIndex = 0;
        } else {
            this.markedWriterIndex = markedWriterIndex - decrement;
        }
    } else {
        this.markedReaderIndex = markedReaderIndex - decrement;
        markedWriterIndex -= decrement;
    }
}

当我们需要跳过某些不需要的字节的时候,可以调用skipBytes方法来跳过指定长度的字节来读取后面的数据。
首先对跳跃长度进行判断,如果跳跃长度小于0的话,会抛出IllegalArgumentException异常,或者跳跃长度大于当前缓冲区可读长度的话,会抛出IndexOutOfBoundsException异常。如果校验通过,新的readerindex为原readerIndex+length,如果新的readerIndex大于writerIndex的话,会抛出IndexOutOfBoundsException异常,否则就更新readerIndex。

public ByteBuf skipBytes(int length) {
    checkReadableBytes(length);
    int newReaderIndex = readerIndex + length;
    if (newReaderIndex > writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))",
                length, readerIndex, writerIndex));
    }
    readerIndex = newReaderIndex;
    return this;
}

ByteBuf源码分析

[图片上传失败...(image-54964b-1539838427542)]

AbstractReferenceCountedByteBuf

AbstractReferenceCountedByteBuf是ByteBuf实现对引用进行计数的基类,用来跟踪对象的分配和销毁,实现自动内存回收。

public ByteBuf retain() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, 1);
        }
        if (refCnt == Integer.MAX_VALUE) {
            throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
        }
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
            break;
        }
    }
    return this;
}

public final boolean release() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, -1);
        }

        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
            if (refCnt == 1) {
                deallocate();
                return true;
            }
            return false;
        }
    }
}

UnpooledHeapByteBuf

UnpooledHeapByteBuf是一个非线程池实现的在堆内存进行内存分配的字节缓冲区,在每次IO操作的都会去创建一个UnpooledHeapByteBuf对象,如果频繁地对内存进行分配或者释放会对性能造成影响。

 public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}

public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    System.arraycopy(src, srcIndex, array, index, length);
    return this;
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
    checkIndex(index, length);
    if (srcIndex < 0 || srcIndex > srcCapacity - length) {
        throw new IndexOutOfBoundsException(String.format(
                "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
    }
}

public ByteBuffer nioBuffer(int index, int length) {
    ensureAccessible();
    return ByteBuffer.wrap(array, index, length).slice();
}

PooledByteBuf

在Netty4之后加入内存池管理,通过内存池管理比之前ByteBuf的创建性能得到了极大提高。

PoolChunk主要负责内存块的分配及释放,chunk中的page会构建成一颗二叉树,默认情况下page的大小是8K,chunk的大小是2^11 page,即16M,构成了11层的二叉树,最下面一层的叶子节点有8192个,与page的数目一样,每一次内存的分配必须保证连续性,方便内存操作。每个节点会记录自己在Memory Area的偏移地址,当一个节点表示的内存区域被分配之后,那么该节点会被标志为已分配,该节点的所有子节点的内存请求都会忽略。每次内存分配的都是8k(2n)大小的内存块,当需要分配大小为chunkSize/(2k)的内存端时,为了找到可用的内存段,会从第K层左边开始寻找可用节点。

在内存分配中,为了能够集中管理内存的分配及释放,同时提供分配和释放内存的性能,一般都是会先预先分配一大块连续的内存,不需要重复频繁地进行内存操作,那一大块连续的内存就叫做memory Arena,而PoolArena是Netty的内存池实现类。
在Netty中,PoolArena是由多个Chunk组成的,而每个Chunk则由多个Page组成。PoolArena是由Chunk和Page共同组织和管理的。

当对于小于一个Page的内存分配的时候,每个Page会被划分为大小相等的内存块,它的大小是根据第一次申请内存分配的内存块大小来决定的。一个Page只能分配与第一次内存内存的内存块的大小相等的内存块,如果想要想要申请大小不想等的内存块,只能在新的Page上申请内存分配了。
Page中的存储区域的使用情况是通过一个long数组bitmap来维护的,每一位表示一个区域的占用情况。

PooledDirectByteBuf

static PooledHeapByteBuf newInstance(int maxCapacity) {
    PooledHeapByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}
final void reuse(int maxCapacity) {
    maxCapacity(maxCapacity);
    setRefCnt(1);
    setIndex0(0, 0);
    discardMarks();
}
public ByteBuf copy(int index, int length) {
    checkIndex(index, length);
    ByteBuf copy = alloc().directBuffer(length, maxCapacity());
    copy.writeBytes(this, index, length);
    return copy;
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator 
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    ByteBuf buf;
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }

    return toLeakAwareBuffer(buf);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

ByteBuf辅助类分析

ByteBufHolder

ByteBufHolder是ByteBuf的一个容器,它可以更方便地访问ByteBuf中的数据,在使用不同的协议进行数据传输的时候,不同的协议消息体包含的数据格式和字段不一样,所以抽象一个ByteBufHolder对ByteBuf进行包装,不同的子类有不同的实现,使用者可以根据自己的需要进行实现。Netty提供了一个默认实现DefaultByteBufHolder。

ByteBufAllocator

ByteBufAllocator是字节缓冲区分配器,根据Netty字节缓冲区的实现不同,分为两种不同的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他们提供了不同ByteBuf的分配方法。

CompositeByteBuf

CompositeByteBuf是一个虚拟的Buffer,它可以将多个ByteBuf组装为一个ByteBuf视图。
在Java NIO中,我们有两种实现的方法

在Netty中,CompositeByByteBuf中维护了一个Component类型的集合。Component是ByteBuf的包装类,它聚合了ByteBuf.维护在集合中的位置偏移量等信息。一般情况下,我们应该使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法来创建CompositeByteBuf,而不是直接通过构造函数去实例化一个CompositeByteBuf对象。

private int addComponent0(int cIndex, ByteBuf buffer) {
    checkComponentIndex(cIndex);
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }

    int readableBytes = buffer.readableBytes();

    // No need to consolidate - just add a component to the list.
    Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
    if (cIndex == components.size()) {
        components.add(c);
        if (cIndex == 0) {
            c.endOffset = readableBytes;
        } else {
            Component prev = components.get(cIndex - 1);
            c.offset = prev.endOffset;
            c.endOffset = c.offset + readableBytes;
        }
    } else {
        components.add(cIndex, c);
        if (readableBytes != 0) {
            updateComponentOffsets(cIndex);
        }
    }
    return cIndex;
}
private void consolidateIfNeeded() {
    final int numComponents = components.size();
    if (numComponents > maxNumComponents) {
        final int capacity = components.get(numComponents - 1).endOffset;

        ByteBuf consolidated = allocBuffer(capacity);

        for (int i = 0; i < numComponents; i ++) {
            Component c = components.get(i);
            ByteBuf b = c.buf;
            consolidated.writeBytes(b);
            c.freeIfNecessary();
        }
        Component c = new Component(consolidated);
        c.endOffset = c.length;
        components.clear();
        components.add(c);
    }
}

public CompositeByteBuf removeComponent(int cIndex) {
    checkComponentIndex(cIndex);
    Component comp = components.remove(cIndex);
    comp.freeIfNecessary();
    if (comp.length > 0) {
        updateComponentOffsets(cIndex);
    }
    return this;
}

private static final class Component {
    final ByteBuf buf;
    final int length;
    int offset;
    int endOffset;

    Component(ByteBuf buf) {
        this.buf = buf;
        length = buf.readableBytes();
    }

    void freeIfNecessary() {
        buf.release(); // We should not get a NPE here. If so, it must be a bug.
    }
}

ByteBufUtil

ByteBufUtil是ByteBuf的工具类,它提供了一系列的静态方法来操作ByteBuf。

上一篇 下一篇

猜你喜欢

热点阅读