netty

netty中操作byteBuf注意事项以及channel的Ati

2022-11-30  本文已影响0人  virtual灬zzZ

channel的Attribute使用

channel本身实现了AttributeMap,我们可以直接当作map来使用,携带一些必要的参数,用于绑定channel,方便存储读取。

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {略}

public interface AttributeMap {
    /**
     * Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
     * an {@link Attribute} which does not have a value set yet.
     */
    <T> Attribute<T> attr(AttributeKey<T> key);

    /**
     * Returns {@code true} if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
     */
    <T> boolean hasAttr(AttributeKey<T> key);
}
使用
AttributeKey<String> attributeKey = AttributeKey.valueOf("seqNo");
channel.attr(attributeKey).get();
channel.attr(attributeKey).set("xxx");

ByteBuf分为两类:

①非池化的ByteBuf

包括UnpooledHeapByteBuf、UnpooledDirectByteBuf等等,每次I/O读写都会创建一个新ByteBuf,频繁进行大块内存的分配和回收对性能有一定影响,非池化的ByteBuf可以通过JVM GC自动回收,也推荐手动回收UnpooledDirectByteBuf等使用堆外内存的ByteBuf;

②池化的ByteBuf

包括pooledHeapByteBuf、pooledDirectByteBuf等等,其先申请一块大内存池,在内存池中分配空间,对于这种应用级别的内存二次分配,就需要手动对池化的ByteBuf进行释放,否则就有可能出现内存泄露的问题。

ByteBuf类图

//堆内
ByteBuf buf = Unpooled.buffer(10);
//堆外
ByteBuf buf = Unpooled.directBuffer(10);

ByteBuf提供了Unpooled非池化的类,可以直接使用,没有提供Pool池化的类。

ByteBuf的选择

一般业务数据的内存分配选用Java堆内存UnpooledHeapByteBuf,其实现简单,回收快,不会出现内存管理问题;对于I/O数据的内存分配一般选用池化的直接内存PooledDirectByteBuf,避免Java堆内存到直接内存的拷贝,但使用池化ByteBuf时切记自己分配的内存一定要在用完后手动释放。

Netty的接收和发送ByteBuf采用的DirectBuffers,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(heap buffers)进行socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入socket中。相比堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

实例:

ByteBuf bodyBuf = Unpooled.copiedBuffer(body);

 public static ByteBuf copiedBuffer(byte[] array) {
        if (array.length == 0) {
            return EMPTY_BUFFER;
        }
        return wrappedBuffer(array.clone());
    }

public static ByteBuf wrappedBuffer(byte[] array) {
        if (array.length == 0) {
            return EMPTY_BUFFER;
        }
        return new UnpooledHeapByteBuf(ALLOC, array, array.length);
    }

这样得到的bytebuf是非池化堆内存,但其内部的alloc是:

private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT

所以readBytes()方法创建出来的是directByteBuf,对应直接/堆外内存,这就意味着,必须手动管理,释放内存,如

 ByteBuf testBuf = bodyBuf.readBytes(2);

最后需要
testBuf .release();

ByteBuf的计数器实现

那对于池化的ByteBuf在使用完释放回池中时,如何知道自己被引用多少次,并且在没有其他引用的时候被释放呢?ByteBuf的具备实现类都继承了AbstractReferenceCountedByteBuf类,该类实现了对计数器的操作功能。当某一操作使得ByteBuf的引用增加时,调用retain()函数,使计数器的值原子增加,当某一操作使得ByteBuf的引用减少时,调用release()函数,使计数器的值原子减少,减少到0便会触发回收操作。关于AbstractReferenceCountedByteBuf类的源码分析,请见Netty框架AbstractReferenceCountedByteBuf 源码分析

ByteBuf计数器的调用场景

(1)当一个ByteBuf新建时,或从另一个ByteBuf创建出独立个体时(比如copy(),readBytes(int length)),新的ByteBuf在初始化的时候,自己的计数器也会初始化。这种ByteBuf使用结束后,要主动释放

(2)有些ByteBuf从另一个ByteBuf衍生出来时(比如decode(),retainedSlice(index, length)),底层共用同一个buffer,也会调用retain()函数,来使得计数器增加。使用完毕也要主动释放。

(3)有些ByteBuf从另一个ByteBuf衍生出来时(比如duplicate(), slice(), order()),底层与父类ByteBuf共用一个buffer,其没有自己的计数器,共用父类ByteBuf的计数器,计数器也不会增加。因此,如果要把这些衍生ByteBuf传递给其他函数时,必须要主动调用retain()函数,并在本函数释放父类ByteBuf,在下一个函数里释放衍生ByteBuf。如下面代码

ByteBuf parent = ctx.alloc().directBuffer(512);
parent.writeBytes(...);

try {
    while (parent.isReadable(16)) {
        ByteBuf derived = parent.readSlice(16);
        derived.retain();
        process(derived);
    }
} finally {
    parent.release();
}
...

public void process(ByteBuf buf) {
    ...
    buf.release();
}

谁来释放ByteBuf

最基本的规则是谁最后访问ByteBuf,谁最后负责释放。需注意的是:
(1)发送组件将ByteBuf传递给接收组件,发送组件一般不负责释放,由接收组件释放;
(2)如果一个组件除了接收处理ByteBUf,而不做其他操作(比如再传给其他组件),这个组件负责释放ByteBuf。

在 ChannelHandler负责链中,如何释放

在Inbound messages中

a. 如果ChannelHandler中,只有处理ByteBuf的操作,不会调ctx.fireChannelRead(buf)把ByteBuf传递下去,那就要在这个ChannelHandler中释放ByteBuf。

b. 如果ChannelHandler中,会调ctx.fireChannelRead(buf)把ByteBuf传递给下一个ChannelHandler,那在当前ChannelHandler中不需要释放ByteBuf,由最后一个使用该ByteBuf的ChannelHandler释放。

c. 如果处理的ByteBuf是由decode()等会增加计数器的操作生成的,不再传递时,ByteBuf也要释放。

d. 如果不确定要不要释放,或者简化释放的过程,可以调用ReferenceCountUtil.release(ByteBuf)函数。

e. 也可以把ChannelHandler都承继自SimpleChannelInboundHandler虚类,该类会在channelRead函数中调用ReferenceCountUtil.release(msg)来帮助释放ByteBuf,如下代码所示,channelRead0(ctx, imsg)是一个虚函数,子类实现channelRead0函数用来完成处理逻辑。

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }
在Outbound messages中

Outbound messages中的ByteBuf都是由应用程序产生的,由Netty负责释放。

内存泄露检测

使用引用计数的缺点在于容易产生内存泄露,因为JVM不知道引用计数的存在。当一个对象不可达时,JVM可能会收回该对象,但这个对象的引用计数可能还不是0,这就导致该对象从池里分配的空间不能归还到池里,从而导致内存泄露。

Netty提供了一种内存泄露检测机制,可以通过配置参数不同选择不同的检测级别,参数设置为java -Dio.netty.leakDetection.level=advanced

DISABLED :完全禁用内存泄露检测,不推荐
SIMPLE :抽样1%的ByteBuf,提示是否有内存泄露
ADVANCED :抽样1%的ByteBuf,提示哪里产生了内存泄露
PARANOID :对每一个ByteBu进行检测,提示哪里产生了内存泄露

测试时,直接提示了ByteBuf内存泄露的位置,如下,找到自己程序代码,看哪里有新生成的ByteBuf对象没有释放,主动释放一下,调用对象的release()函数,或者用工具类帮助释放ReferenceCountUtil.release(msg)。

[nioEventLoopGroup-2-1] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:363)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123)
    io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:872)
    com.spring.netty.twg.service.TwgMessageDecoder.formatDecoder(TwgMessageDecoder.java:176)
    com.spring.netty.twg.service.TwgMessageDecoder.getMessageBody(TwgMessageDecoder.java:90)
    com.spring.netty.twg.service.TwgMessageDecoder.decode(TwgMessageDecoder.java:76)
    io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)

参考:
Netty中ByteBuf内存泄露及释放解析

上一篇下一篇

猜你喜欢

热点阅读