深入浅出Netty源码剖析

Netty源码-内存泄漏检测toLeakAwareBuffer

2019-06-09  本文已影响36人  persisting_

1. 概述

Netty在实现ByteBuf时采用了引用计数法进行ByteBuf的回收,使用引用计数法进行回收的ByteBuf都扩展了AbstractReferenceCountedByteBuf类,在使用AbstractReferenceCountedByteBuf时需要调用AbstractReferenceCountedByteBuf.retain方法递增引用计数器,在使用完毕时则需要调用AbstractReferenceCountedByteBuf.release方法递减引用计数器,当计数器为0时,会进行ByteBuf的回收工作:池化的ByteBuf不会进行实际的内存释放,会将占用的内存归还给内存池,非池化的ByteBuf则会直接释放内存(为了叙述简单,后面释放内存则指真正释放内存或者将内存归还给内存池)。

通过上面的描述可知,ByteBuf的正确回收依赖retainrelease方法的正确调用,内存提前释放(即在使用ByteBuf时没有调用retain方法,导致提前释放)应用会报错,用户也能及时感知到;但是如果使用完ByteBuf忘了调用release则会导致内存不能及时得到回收,造成内存泄漏,且内存泄漏用户无法及时感知,久而久之就会发生OOM。为了解决这种问题,Netty采用了内存泄漏检测机制,发生内存泄漏时会通过日志将内存泄漏信息打印出来,报告给用户。

2. 前置知识

Netty的内存泄漏检测使用了WeakReference,即弱引用,了解过Java四种引用类型(强、软、弱、虚)和引用队列(ReferenceQueue)的读者知道,弱引用持有的对象会在虚拟机触发GC时(不管回收之后内存是否够用)被回收掉,如果使用具有引用队列参数的构造函数实例化WeakReference时,弱引用持有的对象在GC被回收时,弱引用自身会被放入引用队列。

为了后面能更好的理解Netty内存泄漏检测的细节,下面先看几个弱引用的例子,在下面的几个例子中,我们使用的数据类和自定义的弱引用类子类如下:

//实际的数据对象,引用持有的实际对象
class Data {
    private int id = 0;
    
    public int getId() {
        return id;
    }
    
    @Override
    public String toString() {
        return "Data(" + id +")";
    }
}

//在使用弱引用时,一般会自定义其子类,加入一些自己的数据,如果本例
//加了一个id域,这里为了后续输出方便,重写了其toString方法
class MyRef extends WeakReference<Data> {

    private int id;
    public MyRef(Data referent, ReferenceQueue<Object> queue) {
        super(referent, queue);
        //弱引用的id域初始化为被引用对象的id
        this.id = referent.getId();
    }
    
    @Override
    public String toString() {
        return "MyRef(" + id +")";
    }
    
}
public class ReferenceQueueTest {
    public static void main(String[] args) {
        //实例化一个引用队列
        ReferenceQueue<Object> queue = new ReferenceQueue<Object>();
        //弱引用持有的实际对象
        Data d = new Data();
        //实例化一个自定义弱引用,传入实际对象和引用队列
        MyRef ref = new MyRef(d, queue);
        //显示触发gc
        System.gc();
        
        System.out.println("After call gc(): ");
        System.out.println("ref.get: " + ref.get());
        System.out.println("queue.poll: " + queue.poll());

    }
}
//输出如下(注释不是输出,是解释说明):
After call gc(): 
//gc之后弱引用没有被回收,因为d还指向Data对象,即还有强引用
//所以还能从ref.get中返回实际对象
ref.get: Data(0)
//队列中也没有任何元素,表示弱引用自身没有被放入队列中
queue.poll: null
public class ReferenceQueueTest {
    public static void main(String[] args) {
        //实例化一个引用队列
        ReferenceQueue<Object> queue = new ReferenceQueue<Object>();
        //弱引用持有的实际对象
        Data d = new Data();
        //实例化一个自定义弱引用,传入实际对象和引用队列
        MyRef ref = new MyRef(d, queue);
        //垃圾回收之前删除所有的强引用
        d = null;
        
        //显示触发gc
        System.gc();
        
        System.out.println("After call gc(): ");
        System.out.println("ref.get: " + ref.get());
        System.out.println("queue.poll: " + queue.poll());

    }
}
//输出如下(注释不是输出,是解释说明):
After call gc(): 
//可见因为手动将d=null,已经没有强引用指向实际对象,发生GC时,
//弱引用持有的实际对象会被回收,ref.get返回null
ref.get: null
//因为GC回收时,jvm回收了弱引用持有的实际对象,所以弱引用自身也被
//放入引用队列中,这里从引用队列中成功取出了弱引用
queue.poll: MyRef(0)
public class ReferenceQueueTest {
    public static void main(String[] args) {
        //实例化一个引用队列
        ReferenceQueue<Object> queue = new ReferenceQueue<Object>();
        //弱引用持有的实际对象
        Data d = new Data();
        //实例化一个自定义弱引用,传入实际对象和引用队列
        MyRef ref = new MyRef(d, queue);
        
        //发生GC前手动清空弱引用持有的实际对象
        ref.clear();
        
        //显示触发gc
        System.gc();
        
        System.out.println("After call gc(): ");
        System.out.println("ref.get: " + ref.get());
        System.out.println("queue.poll: " + queue.poll());

    }
}

//输出如下(注释不是输出,是解释说明):
After call gc(): 
//因为在发生GC前,通过调用clear方法手动清空了弱引用实际持有的对象
//所以这里ref.get肯定返回null
ref.get: null
//发生GC前,自己调用clear方法手动清空弱引用持有的实际对象,
//所以该弱引用自身不会被jvm放入引用队列中
queue.poll: null

好了,三个例子已经介绍完毕,后面在介绍Netty内存泄漏检测时就使用了这里的例子结果,在具体介绍时会和这里的例子一一对应。

3. 内存泄漏检测入口

Netty中将普通ByteBuf转为具有内存泄漏检测功能的ByteBuf是通过AbstractByteBufAllocator.toLeakAwareBuffer方法实现的,我们直接在Eclipse中看该方法的调用层次即可知道Netty在哪里对ByteBuf进行了转换,该方法调用如下图所示:

toLeakAwareBuffer方法调用.png

可见池化内存分配器在分配heap或者direct ByteBuf时都进行了转换,非池化内存分配器仅在分配direct ByteBuf时进行了转换。个人理解时采用池化内存需要特别关注内存释放,否则为了实现池化内存预先分配的一大块内存会因为没有释放被很快分配完,造成后面没有内存进行分配。非池化分配的直接内存也需要特别注意释放,放置内存泄漏;非池化分配的heap内存(其实就是一个byte数组)则可以在对象被回收时同时被回收掉,发生内存泄漏的可能性较小。

4. 内存泄漏检测相关类介绍

本节介绍Netty中内存泄漏检测相关的类,仅做一个大致介绍,类中的重要方法我们放在后面介绍。

主要负责使用track方法对指定的ByteBuf进行内存检测泄漏进行追踪,并返回负责追踪的ResourceLeakTracker类实例,同时在调用track方法时,也会根据指定的检测级别汇报最近的内存泄漏检测结果。该类由工厂类ResourceLeakDetectorFactory负责实例化,默认的实现为ResourceLeakDetector,在ResourceLeakDetectorFactory类的默认实现DefaultResourceLeakDetectorFactory中,也会根据用户是否配置了io.netty.customResourceLeakDetector来决定采用默认实现ResourceLeakDetector还是使用用户自定义的ResourceLeakDetector,用户自定义的ResourceLeakDetector必须是其子类。

默认实现为DefaultResourceLeakDefaultResourceLeak实现了ResourceLeakTrackerResourceLeak接口,同时也继承了类WeakReference,是一个弱引用实现。首先,同上面例2的结果一样,如果在使用ByteBuf时忘了调用AbstractReferenceCountedByteBuf.release方法,那么将不会调用DefaultResourceLeak.clear方法去手动清空该弱引用持有的实际对象,在发生GC时,会由垃圾收集器对弱引用持有的实际对象进行回收,即发生了内存泄漏,同时该弱引用自身也会被加入到引用队列中,该引用队列是ResourceLeakDetector的成员域,上面介绍ResourceLeakDetector类时说到该类会在用户track指定ByteBuf是汇报检测结果,该类的汇报数据来源就是引用队列。DefaultResourceLeak同时还提供了record方法可以让用户在指定时机选择调用,这个方法可以记录用户的调用轨迹(堆栈)。Record同时也是一种单链表,在DefaultResourceLeak中就使用单链表记录用户的调用轨迹。

DefaultResourceLeak供用户记录程序调用轨迹的类,也就是DefaultResourceLeak.record方法返回的对象,继承自Throwable,因此可以使用Throwable.getStackTrace方法获得调用轨迹信息,打印在内存泄漏报告中可以让用户更好的排除内存泄漏问题。

在上面介绍ResourceLeakTracker时,说到其默认实现为DefaultResourceLeakDefaultResourceLeak提供了record方法记录用户的调用轨迹,用户可在调用ByteBuf方法时调用record方法记录调用轨迹,调用的频率越多,后面在汇报内存泄漏情况时就能打印出越详细的信息,这样也能更方便的排查问题。

Netty提供了两个ByteBuf的封装类供选择,就对应不同的record调用频率,每个封装类都持有ResourceLeakTracker对象,Netty根据配置的内存检测级别(下一节介绍相关配置参数)使用不同的ByteBuf封装类。

Netty提供的两个ByteBuf封装类就是SimpleLeakAwareCompositeByteBufAdvancedLeakAwareCompositeByteBufAdvancedLeakAwareCompositeByteBufSimpleLeakAwareCompositeByteBuf的子类,SimpleLeakAwareCompositeByteBuf类仅仅持有ResourceLeakTracker对象,但是看其源码,发现没有调用过record方法,所以只能知道是否发生了内存泄漏时,无法打印出任何调用轨迹信息。AdvancedLeakAwareCompositeByteBuf作为SimpleLeakAwareCompositeByteBuf的子类,在ByteBuf的多个方法中调用了record方法,所以在发生内存泄漏时,能够打印出比较详细的调用轨迹信息。

AdvancedLeakAwareCompositeByteBuf类中使用了配置参数io.netty.leakDetection.acquireAndReleaseOnly来控制是否只是在调用增加或减少引用计数器的方法时才调用record方法记录调用轨迹,默认为false。AdvancedLeakAwareCompositeByteBufretainrelease方法因为改变了引用计数器就直接调用了record方法,而该类中的其他方法则根据io.netty.leakDetection.acquireAndReleaseOnly的配置决定是否调用record方法,这里为了节省篇幅就不列出AdvancedLeakAwareCompositeByteBuf类中调用record的方法了,读者可自行查看。

5. 内存泄漏检测相关配置参数

在介绍相关配置参数之前,我们先看下Netty提供的内存泄漏检测级别:

/**
 * Represents the level of resource leak detection.
 */
public enum Level {
    /**
     * Disables resource leak detection.
     */
    //关闭内存泄漏检测
    DISABLED,
    /**
     * Enables simplistic sampling resource leak detection which reports there is a leak or not,
     * at the cost of small overhead (default).
     */
    //实现最简单的内存泄漏检测,没有任何信息输出,仅仅输出是否发生
    //了内存泄漏,采用的ByteBuf包装类是
    //SimpleLeakAwareCompositeByteBuf
    SIMPLE,
    /**
     * Enables advanced sampling resource leak detection which reports where the leaked object was accessed
     * recently at the cost of high overhead.
     */
    //实现高级点的内存泄漏检测,发生泄漏时会同时打印出一些调用轨迹
    //采用的ByteBuf包装类是AdvancedLeakAwareCompositeByteBuf
    ADVANCED,
    /**
     * Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
     * at the cost of the highest possible overhead (for testing purposes only).
     */
    //比较激进的内存泄漏检测,代价较高,一般用于测试,采用的ByteBuf
    //包装类也是AdvancedLeakAwareCompositeByteBuf
    PARANOID;
}

Level.ADVANCEDLevel.PARANOID使用的ByteBuf包装类都是AdvancedLeakAwareCompositeByteBuf,我们上面介绍ResourceLeakDetector类时提到该类使用track方法对指定的ByteBuf进行内存检测泄漏进行追踪,并返回负责追踪的ResourceLeakTracker类实例,同时在调用track方法时,也会根据指定的检测级别汇报最近的内存泄漏检测结果。如果内存泄漏检测级别为Level.PARANOID时则每次调用track方法都会进行内存泄漏报告;如果级别为Level.ADVANCED或者Level.SIMPLE则会以一定频率进行内存泄漏报告,而不是每次track都进行报告。

是否关闭Netty内存泄漏检测功能,默认为false。如果该参数配置为false,则默认的内存泄漏检测级别根据此参数的配置为Level.DISABLED,否则默认的级别为Level.SIMPLE

配置内存泄漏检测级别的参数,用于老版本的配置参数。

新的内存泄漏检测级别参数,如果没有配置,则会采用老版本参数配置的级别作为最终配置。

在第4节介绍内存泄漏检测相关类时,我们介绍过DefaultResourceLeak提供了record方法记录用户的调用轨迹,如果当前保存的调用轨迹记录数Record大于参数io.netty.leakDetection.targetRecords配置的值,那么会以一定的概率(1/2^n)删除头结点之后再加入新的记录,当然也有可能不删除头结点直接新增新的记录。

该参数的默认为4。

上面介绍过,在AdvancedLeakAwareCompositeByteBuf类中使用了配置参数io.netty.leakDetection.acquireAndReleaseOnly来控制是否只是在调用增加或减少引用计数器的方法时才调用record方法记录调用轨迹,默认为false。

在介绍ResourceLeakDetector类时提到过,默认的ResourceLeakDetector类就是ResourceLeakDetector,但是用户可以使用参数io.netty.customResourceLeakDetector来决定采用默认实现ResourceLeakDetector还是使用用户自定义的ResourceLeakDetector

6. 内存检测实现

我们在第二节介绍了Netty中将普通ByteBuf转为具有内存泄漏检测功能的ByteBuf是通过AbstractByteBufAllocator.toLeakAwareBuffer方法实现的。

这里我们先看下该方法的源码:

//AbstractByteBufAllocator
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    //获取配置的内存泄漏检测级别,根据不同的级别会返回
    //不同的ByteBuf包装类,即下面的SimpleLeakAwareByteBuf
    //或者AdvancedLeakAwareByteBuf
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            //生成ResourceLeakTracker对象
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            //生成ResourceLeakTracker对象
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
        default:
            break;
    }
    return buf;
}

上面的源码中是调用AbstractByteBuf.leakDetector.track(buf)返回ResourceLeakTracker类对象的,这里我们看下默认的ResourceLeakDetectortrack方法实现:

//ResourceLeakDetector
@SuppressWarnings("unchecked")
public final ResourceLeakTracker<T> track(T obj) {
    return track0(obj);
}

@SuppressWarnings("unchecked")
private DefaultResourceLeak track0(T obj) {
    Level level = ResourceLeakDetector.level;
    //如果配置的级别为Level.DISABLED,则返回null
    if (level == Level.DISABLED) {
        return null;
    }
    //如果内存泄漏检测级别为Level.ADVANCED或者Level.SIMPLE
    //则会以一定频率进行内存泄漏报告,而不是每次`track`都进行报告。
    if (level.ordinal() < Level.PARANOID.ordinal()) {
        if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
            //内存泄漏报告方法我们后面再看
            reportLeak();
            //返回DefaultResourceLeak,为一个自定义的弱引用
            //并传入引用队列
            return new DefaultResourceLeak(obj, refQueue, allLeaks);
        }
        return null;
    }
    //如果内存泄漏检测级别为Level.PARANOID,则每次track都会
    //进行内存泄漏报告,内存泄漏报告方法我们后面再看
    reportLeak();
    //返回DefaultResourceLeak,为一个自定义的弱引用
    //并传入引用队列
    return new DefaultResourceLeak(obj, refQueue, allLeaks);
}

我们看到AbstractByteBufAllocator.toLeakAwareBufferResourceLeakDetector.track返回的DefaultResourceLeak和传入的ByteBuf对象进行封装,返回了具有内存泄漏检测功能的ByteBuf封装类SimpleLeakAwareCompositeByteBuf或其子类AdvancedLeakAwareCompositeByteBuf。如果应用程序在使用ByteBuf正确调用了retainrelease方法,则在引用计数器为0时,则会清除弱引用持有的实际对象,发生GC时,DefaultResourceLeak也不会被放入引用队列中(见前面第2节例3结果)。

//SimpleLeakAwareCompositeByteBuf
@Override
public boolean release() {
    if (super.release()) {
        //引用计数为0,则清除弱引用
        closeLeak();
        return true;
    }
    return false;
}

@Override
public boolean release(int decrement) {
    if (super.release(decrement)) {
        //引用计数为0,则清除弱引用
        closeLeak();
        return true;
    }
    return false;
}

private void closeLeak() {
    // Close the ResourceLeakTracker with the tracked ByteBuf as argument. This must be the same that was used when
    // calling DefaultResourceLeak.track(...).
    boolean closed = leak.close(trackedByteBuf);
    assert closed;
}

//DefaultResourceLeak
@Override
public boolean close() {
    // Use the ConcurrentMap remove method, which avoids allocating an iterator.
    if (allLeaks.remove(this, LeakEntry.INSTANCE)) {
        //显示清空该弱引用持有的实际对象,所以该弱引用自身不会
        //在GC时被加入应用队列
        // Call clear so the reference is not even enqueued.
        clear();
        headUpdater.set(this, null);
        return true;
    }
    return false;
}

但是如果应用程序在使用ByteBuf没有正确调用retainrelease方法,则不会清除弱引用持有的实际对象,此时如果实际上已经没有强引用指向该ByteBuf,那么在发生GC时,垃圾收集器会回收该ByteBuf,而弱引用DefaultResourceLeak会被放入引用队列中(见前面第2节例2结果),加入到引用队列中的就是识别到的发生内存泄漏的ByteBuf。在ResourceLeakDetector.track方法中调用的reportLeak输出的就是引用队列中的弱引用DefaultResourceLeak

//ResourceLeakDetector
private void reportLeak() {
    //汇报内存检测需要日志启用error级别
    //没有启用则清空引用队列并返回
    if (!logger.isErrorEnabled()) {
        clearRefQueue();
        return;
    }

    // Detect and report previous leaks.
    //依次取出引用队列中的所有弱引用进行输出
    for (;;) {
        @SuppressWarnings("unchecked")
        DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
        if (ref == null) {
            break;
        }

        if (!ref.dispose()) {
            continue;
        }

        //调用弱引用DefaultResourceLeak的toString方法,
        //其实主要是返回其record记录的调用轨迹信息
        String records = ref.toString();
        //汇报过的放入reportedLeaks进行记录
        if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {
            //如果ByteBuf没有调用过record方法(比如Level.SIMPLE使用
            //的SimpleLeakAwareCompositeByteBuf就不会调用record方法
            //或者使用AdvancedLeakAwareCompositeByteBuf但是没有显示
            //调用过其record方法
            if (records.isEmpty()) {
                reportUntracedLeak(resourceType);
            } else {
                reportTracedLeak(resourceType, records);
            }
        }
    }
}

/**
 * This method is called when an untraced leak is detected. It can be overridden for tracking how many times leaks
 * have been detected.
 */
//没有调用轨迹的,就直接打印出内存泄漏的简单信息
protected void reportUntracedLeak(String resourceType) {
    logger.error("LEAK: {}.release() was not called before it's garbage-collected. " +
            "Enable advanced leak reporting to find out where the leak occurred. " +
            "To enable advanced leak reporting, " +
            "specify the JVM option '-D{}={}' or call {}.setLevel() " +
            "See http://netty.io/wiki/reference-counted-objects.html for more information.",
            resourceType, PROP_LEVEL, Level.ADVANCED.name().toLowerCase(), simpleClassName(this));
}

/**
 * This method is called when a traced leak is detected. It can be overridden for tracking how many times leaks
 * have been detected.
 */
//有调用轨迹的,会同时打印出调用轨迹信息
protected void reportTracedLeak(String resourceType, String records) {
    logger.error(
            "LEAK: {}.release() was not called before it's garbage-collected. " +
            "See http://netty.io/wiki/reference-counted-objects.html for more information.{}",
            resourceType, records);
}

到这里,已经基本上介绍完Netty内存检测的实现原理,下面我们再看下DefaultResourceLeak.record是如何记录调用轨迹的:

//DefaultResourceLeak
@Override
public void record() {
    record0(null);
}

@Override
public void record(Object hint) {
    record0(hint);
}

private void record0(Object hint) {
    //将record记录追加到单链表中,单链表的长度为上面
    //参数io.netty.leakDetection.targetRecords配置的,
    //默认长度为4
    // Check TARGET_RECORDS > 0 here to avoid similar check before remove from and add to lastRecords
    if (TARGET_RECORDS > 0) {
        Record oldHead;
        Record prevHead;
        Record newHead;
        boolean dropped;
        do {
            if ((prevHead = oldHead = headUpdater.get(this)) == null) {
                // already closed.
                return;
            }
            final int numElements = oldHead.pos + 1;
            if (numElements >= TARGET_RECORDS) {
                final int backOffFactor = Math.min(numElements - TARGET_RECORDS, 30);
                if (dropped = PlatformDependent.threadLocalRandom().nextInt(1 << backOffFactor) != 0) {
                    prevHead = oldHead.next;
                }
            } else {
                dropped = false;
            }
            newHead = hint != null ? new Record(prevHead, hint) : new Record(prevHead);
        } while (!headUpdater.compareAndSet(this, oldHead, newHead));
        if (dropped) {
            droppedRecordsUpdater.incrementAndGet(this);
        }
    }
}

最后我们再看下Record是如何输出调用轨迹的,前面我们说到Record继承自类Throwable,因此可使用getStackTrace方法获取实例化该对象时的调用轨迹,所以上面在输出内存泄漏报告时就调用了Record.toString方法:

//Record
@Override
    public String toString() {
        StringBuilder buf = new StringBuilder(2048);
        if (hintString != null) {
            buf.append("\tHint: ").append(hintString).append(NEWLINE);
        }

        // Append the stack trace.
        //通过getStackTrace获取调用的栈轨迹
        StackTraceElement[] array = getStackTrace();
        // Skip the first three elements.
        out: for (int i = 3; i < array.length; i++) {
            StackTraceElement element = array[i];
            // Strip the noisy stack trace elements.
            //excludedMethods用于过滤一些没法用于找出问题的
            //无用轨迹
            String[] exclusions = excludedMethods.get();
            for (int k = 0; k < exclusions.length; k += 2) {
                if (exclusions[k].equals(element.getClassName())
                        && exclusions[k + 1].equals(element.getMethodName())) {
                    continue out;
                }
            }

            buf.append('\t');
            buf.append(element.toString());
            buf.append(NEWLINE);
        }
        return buf.toString();
    }
}
上一篇下一篇

猜你喜欢

热点阅读