netty ByteBuf

2020-03-15  本文已影响0人  田才

本文基于 netty 4.1.46

一、作用

ByteBuf 的作用:在用 jdk nio 写程序主要使用的是java.nio.ByteBuffer,从功能角度,ByteBuffer完全可以满足需要,但是有一定的局限性。
例如:
(1)、长度固定、容量不能动态扩展和收缩
(2)、ByteBuffer 只有一个 position 读写的时候需要手工调用 flip() 和 rewind() 方法重置标记,否则导致处理失败
为了弥补这些不足,Netty 提供了类似 java.nio.ByteBuffer 的实现 ByteBuf ,带着这几个问题,看一下 netty 是怎么解决的。

二、ByteBuf 结构

image.png

注意:最后一部分maxCapacity 默认为Integer.MAX_VALUE,netty 注释中没有,个人觉得加上更清晰一些

(1)、read \ write \ set 方法
read 和 write 都会移动指针 ,set 方法不会

(2)、mark 和 reset 方法
mark 可以理解为保存点,即保存读之前的 readerIndex ,或写之前的 writerIndex , reset 方法为mark 方法的保存点。

此处 netty 两个指针分别保存读写标志位,所以避免了jdk.nio.ByteBuffer,写之后读或读后写,都要反复重置标记位问题。

1、ByteBuf 的继承关系

clipboard.png

ByteBuf 继承体系中最重要的是 AbstractByteBuf 类,所有骨架方法大部分都在此类中实现。

netty ByteBuf 的继承体系可分为三类。

三 、实现扩容

1、AbstractByteBuf 类 writeBytes(byte[] src, int srcIndex, int length)
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    //如果写入的 length 大于数组容量则扩容
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    //写索引累计
    writerIndex += length;
    return this;
}
2、AbstractByteBuf 类 ensureWritable0

步骤:

final void ensureWritable0(int minWritableBytes) {
    final int writerIndex = writerIndex();
    final int targetCapacity = writerIndex + minWritableBytes;
    //不需要扩容直接返回
    if (targetCapacity <= capacity()) {
        //检查 ByteBuf 是否被释放了,refCnt ==0
        ensureAccessible();
        return;
    }

    //大于ByteBuf的最大扩容阀值抛出异常  maxCapacity 默认为 Integer.MAX_VALUE
    if (checkBounds && targetCapacity > maxCapacity) {
        ensureAccessible();
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }
    //计算 ByteBuf 剩余可写容量,Math.min(maxLength, maxCapacity()) - writerIndex;
    // Normalize the target capacity to the power of 2.
    final int fastWritable = maxFastWritableBytes();
    //ByteBuf 剩余空间是否大于需要写入的空间,否则计算需要分配的内存空间
    int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
            : alloc().calculateNewCapacity(targetCapacity, maxCapacity);

    // Adjust to the new capacity.
    capacity(newCapacity);
}
3、calculateNewCapacity() 计算需要分配的内存空间

因为内存比较小的时候采用倍增是可以接受的,但是内存大的情况下 ,比较浪费空间。所以内存比较大的情况采用递增,节约内存。

@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    checkPositiveOrZero(minNewCapacity, "minNewCapacity");
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    //4M 为扩容策略的阀值
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }
    //如果需求容量 大于4M 那么以 4M 步长递增
    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }
    //如果需求容量 小于4M 那么以 64 基数被增
    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}
4、增加容量方法 capacity(int newCapacity)

以 AbstractByteBuf 的子类 PooledByteBuf 的 capacity(int newCapacity) 为例

length 是指用户实际要求分配的 byte[] 的长度
maxLength 是指 ByteBuf 在 Chunk 的 memory 中实际占用的长度

@Override
public final ByteBuf capacity(int newCapacity) {
    //length 是指用户实际要求分配的 byte[] 的长度
    if (newCapacity == length) {
        ensureAccessible();
        return this;
    }
    //检查扩容边界
    checkNewCapacity(newCapacity);
    if (!chunk.unpooled) {
        // If the request capacity does not require reallocation, just update the length of the memory.
        if (newCapacity > length) {
            //扩容的容量 newCapacity 小于当前 Buf 的 maxLength,那么修改 length 即可,
            //maxLength 是指 ByteBuf 在 Chunk 的 memory 中实际占用的长度
            if (newCapacity <= maxLength) {
                length = newCapacity;
                return this;
            }
        } else if (newCapacity > maxLength >>> 1 &&
                (maxLength > 512 || newCapacity > maxLength - 16)) {
            //如果 newCapacity 大于 maxLength 的一半,那么更新 length
            // here newCapacity < length
            length = newCapacity;
            trimIndicesToCapacity(newCapacity);
            return this;
        }
    }

    //申请内存
    // Reallocation required.
    chunk.arena.reallocate(this, newCapacity, true);
    return this;
}
5、申请内存

关于内存的分配 allocate 和释放 free 下节讲解

void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
    assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();

    int oldCapacity = buf.length;
    if (oldCapacity == newCapacity) {
        return;
    }
    //将久 ByteBuf 的信息保存
    PoolChunk<T> oldChunk = buf.chunk;
    ByteBuffer oldNioBuffer = buf.tmpNioBuf;
    long oldHandle = buf.handle;
    T oldMemory = buf.memory;
    int oldOffset = buf.offset;
    int oldMaxLength = buf.maxLength;

    //重新划分一块内存,并且调用 buf.init() 方法。
    // This does not touch buf's reader/writer indices
    allocate(parent.threadCache(), buf, newCapacity);
    int bytesToCopy;
    if (newCapacity > oldCapacity) {
        bytesToCopy = oldCapacity;
    } else {
        buf.trimIndicesToCapacity(newCapacity);
        bytesToCopy = newCapacity;
    }
    //将旧的内存中的数据,copy 到新申请到内存中
    memoryCopy(oldMemory, oldOffset, buf, bytesToCopy);
    //释放久的内存占用
    if (freeOldMemory) {
        free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
    }
}
6、byte 复制
  @Override
    protected void memoryCopy(ByteBuffer src, int srcOffset, PooledByteBuf<ByteBuffer> dstBuf, int length) {
        if (length == 0) {
            return;
        }
        //jdk api 中可以获取到 Unsafe 对象
        if (HAS_UNSAFE) {
            PlatformDependent.copyMemory(
                    PlatformDependent.directBufferAddress(src) + srcOffset,
                    PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length);
        } else {
            //否者用 java nio ByteBuffer 进行copy
            // We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
            src = src.duplicate();
            ByteBuffer dst = dstBuf.internalNioBuffer();
            src.position(srcOffset).limit(srcOffset + length);
            dst.position(dstBuf.offset);
            dst.put(src);
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读