netty中操作byteBuf注意事项以及channel的Ati
channel的Attribute使用
channel本身实现了AttributeMap,我们可以直接当作map来使用,携带一些必要的参数,用于绑定channel,方便存储读取。
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {略}
public interface AttributeMap {
/**
* Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
* an {@link Attribute} which does not have a value set yet.
*/
<T> Attribute<T> attr(AttributeKey<T> key);
/**
* Returns {@code true} if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
*/
<T> boolean hasAttr(AttributeKey<T> key);
}
使用
AttributeKey<String> attributeKey = AttributeKey.valueOf("seqNo");
channel.attr(attributeKey).get();
channel.attr(attributeKey).set("xxx");
ByteBuf分为两类:
①非池化的ByteBuf
包括UnpooledHeapByteBuf、UnpooledDirectByteBuf等等,每次I/O读写都会创建一个新ByteBuf,频繁进行大块内存的分配和回收对性能有一定影响,非池化的ByteBuf可以通过JVM GC自动回收,也推荐手动回收UnpooledDirectByteBuf等使用堆外内存的ByteBuf;
②池化的ByteBuf
包括pooledHeapByteBuf、pooledDirectByteBuf等等,其先申请一块大内存池,在内存池中分配空间,对于这种应用级别的内存二次分配,就需要手动对池化的ByteBuf进行释放,否则就有可能出现内存泄露的问题。
ByteBuf类图
//堆内
ByteBuf buf = Unpooled.buffer(10);
//堆外
ByteBuf buf = Unpooled.directBuffer(10);
ByteBuf提供了Unpooled非池化的类,可以直接使用,没有提供Pool池化的类。
ByteBuf的选择
一般业务数据的内存分配选用Java堆内存UnpooledHeapByteBuf,其实现简单,回收快,不会出现内存管理问题;对于I/O数据的内存分配一般选用池化的直接内存PooledDirectByteBuf,避免Java堆内存到直接内存的拷贝,但使用池化ByteBuf时切记自己分配的内存一定要在用完后手动释放。
Netty的接收和发送ByteBuf采用的DirectBuffers,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(heap buffers)进行socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入socket中。相比堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
实例:
ByteBuf bodyBuf = Unpooled.copiedBuffer(body);
public static ByteBuf copiedBuffer(byte[] array) {
if (array.length == 0) {
return EMPTY_BUFFER;
}
return wrappedBuffer(array.clone());
}
public static ByteBuf wrappedBuffer(byte[] array) {
if (array.length == 0) {
return EMPTY_BUFFER;
}
return new UnpooledHeapByteBuf(ALLOC, array, array.length);
}
这样得到的bytebuf是非池化堆内存,但其内部的alloc是:
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT
所以readBytes()方法创建出来的是directByteBuf,对应直接/堆外内存,这就意味着,必须手动管理,释放内存,如
ByteBuf testBuf = bodyBuf.readBytes(2);
最后需要
testBuf .release();
ByteBuf的计数器实现
那对于池化的ByteBuf在使用完释放回池中时,如何知道自己被引用多少次,并且在没有其他引用的时候被释放呢?ByteBuf的具备实现类都继承了AbstractReferenceCountedByteBuf类,该类实现了对计数器的操作功能。当某一操作使得ByteBuf的引用增加时,调用retain()函数,使计数器的值原子增加,当某一操作使得ByteBuf的引用减少时,调用release()函数,使计数器的值原子减少,减少到0便会触发回收操作。关于AbstractReferenceCountedByteBuf类的源码分析,请见Netty框架AbstractReferenceCountedByteBuf 源码分析。
ByteBuf计数器的调用场景
(1)当一个ByteBuf新建时,或从另一个ByteBuf创建出独立个体时(比如copy(),readBytes(int length)),新的ByteBuf在初始化的时候,自己的计数器也会初始化。这种ByteBuf使用结束后,要主动释放
(2)有些ByteBuf从另一个ByteBuf衍生出来时(比如decode(),retainedSlice(index, length)),底层共用同一个buffer,也会调用retain()函数,来使得计数器增加。使用完毕也要主动释放。
(3)有些ByteBuf从另一个ByteBuf衍生出来时(比如duplicate(), slice(), order()),底层与父类ByteBuf共用一个buffer,其没有自己的计数器,共用父类ByteBuf的计数器,计数器也不会增加。因此,如果要把这些衍生ByteBuf传递给其他函数时,必须要主动调用retain()函数,并在本函数释放父类ByteBuf,在下一个函数里释放衍生ByteBuf。如下面代码
ByteBuf parent = ctx.alloc().directBuffer(512);
parent.writeBytes(...);
try {
while (parent.isReadable(16)) {
ByteBuf derived = parent.readSlice(16);
derived.retain();
process(derived);
}
} finally {
parent.release();
}
...
public void process(ByteBuf buf) {
...
buf.release();
}
谁来释放ByteBuf
最基本的规则是谁最后访问ByteBuf,谁最后负责释放。需注意的是:
(1)发送组件将ByteBuf传递给接收组件,发送组件一般不负责释放,由接收组件释放;
(2)如果一个组件除了接收处理ByteBUf,而不做其他操作(比如再传给其他组件),这个组件负责释放ByteBuf。
在 ChannelHandler负责链中,如何释放
在Inbound messages中
a. 如果ChannelHandler中,只有处理ByteBuf的操作,不会调ctx.fireChannelRead(buf)把ByteBuf传递下去,那就要在这个ChannelHandler中释放ByteBuf。
b. 如果ChannelHandler中,会调ctx.fireChannelRead(buf)把ByteBuf传递给下一个ChannelHandler,那在当前ChannelHandler中不需要释放ByteBuf,由最后一个使用该ByteBuf的ChannelHandler释放。
c. 如果处理的ByteBuf是由decode()等会增加计数器的操作生成的,不再传递时,ByteBuf也要释放。
d. 如果不确定要不要释放,或者简化释放的过程,可以调用ReferenceCountUtil.release(ByteBuf)函数。
e. 也可以把ChannelHandler都承继自SimpleChannelInboundHandler虚类,该类会在channelRead函数中调用ReferenceCountUtil.release(msg)来帮助释放ByteBuf,如下代码所示,channelRead0(ctx, imsg)是一个虚函数,子类实现channelRead0函数用来完成处理逻辑。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
在Outbound messages中
Outbound messages中的ByteBuf都是由应用程序产生的,由Netty负责释放。
内存泄露检测
使用引用计数的缺点在于容易产生内存泄露,因为JVM不知道引用计数的存在。当一个对象不可达时,JVM可能会收回该对象,但这个对象的引用计数可能还不是0,这就导致该对象从池里分配的空间不能归还到池里,从而导致内存泄露。
Netty提供了一种内存泄露检测机制,可以通过配置参数不同选择不同的检测级别,参数设置为java -Dio.netty.leakDetection.level=advanced
DISABLED :完全禁用内存泄露检测,不推荐
SIMPLE :抽样1%的ByteBuf,提示是否有内存泄露
ADVANCED :抽样1%的ByteBuf,提示哪里产生了内存泄露
PARANOID :对每一个ByteBu进行检测,提示哪里产生了内存泄露
测试时,直接提示了ByteBuf内存泄露的位置,如下,找到自己程序代码,看哪里有新生成的ByteBuf对象没有释放,主动释放一下,调用对象的release()函数,或者用工具类帮助释放ReferenceCountUtil.release(msg)。
[nioEventLoopGroup-2-1] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:363)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123)
io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:872)
com.spring.netty.twg.service.TwgMessageDecoder.formatDecoder(TwgMessageDecoder.java:176)
com.spring.netty.twg.service.TwgMessageDecoder.getMessageBody(TwgMessageDecoder.java:90)
com.spring.netty.twg.service.TwgMessageDecoder.decode(TwgMessageDecoder.java:76)
io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)