JUC并发相关

20. 并发终结之ThreadLocal

2020-09-21  本文已影响0人  涣涣虚心0215

我们提到过如何保证共享变量的线程安全性,比如可以用synchronized内部锁,也可以使用Lock,volatile或者原子变量。
除此之外ThreadLocal也是保证共享变量线程安全的一个手段,它将共享变量变成线程私有(保证了对非线程安全对象访问的线程安全,又避免了锁的开销)。

final static ThreadLocal<T> VAR = new ThreadLocal()

ThreadLocal通常会被作为某个类的静态字段使用,因为如果声明成实例变量,那么每创建该类的一个实例都会导致新的ThreadLocal实例被创建,即使没有导致错误,也会导致重复创建对象带来的浪费;static意思就是这个类的所有实例共有这个VAR变量。

ThreadLocal实现机制:

Java里面每个Thread都维护了一个ThreadLocalMap对象threadLocals,ThreadLocalMap里面包含了若干Entry(private Entry[] table....一个key-value键值对),因此可以说每个线程都拥有若干这样的条目,相应的线程就被称为这些条目的属主线程
Entry的key是一个ThreadLocal实例,Value是一个线程特有对象,即Entry的作用就是建立一个ThreadLocal实例与线程特有对象之间的对应关系,但是由于Entry对ThreadLocal实例的引用(Key的引用)是一个弱引用(WeakReference),因此它不会阻止被引用的ThreadLocal实例被垃圾回收,如果ThreadLocal引用置为null,那么GC的时候ThreadLocal实例被垃圾回收,那么其所在Entry的Key会被设置成null,此时Entry就是无效条目。另一方面,由于Entry对Value的引用是强引用,如果Entry本身有对它的可达强引用,那么Entry也会阻止其引用的Value被垃圾回收。
此时,当ThreadLocalMap有新的ThreadLocal线程和Value对象关系被创建(相当于有新的Entry对象被添加到ThreadLocalMap)的时候,ThreadLocalMap会将无效条目清理,即使得对Value对象的强引用被清除,Value对象得以被垃圾回收;但是也有缺点,如果线程在相当长时间(或者一直)处于非运行状态,那么该线程的ThreadLocalMap可能就没有任何变化,相应的ThreadLocalMap中无效条目也不会被清理,这就可能导致这些线程的Entry所引用的Value对象无法被线程回收。导致伪内存泄漏

ThreadLocal
例如Value实线部分,即线程对象持有线程特有对象的强引用,可能会导致伪内存泄漏,那么只要打破这种关系,即通过调用ThreadLocal.remove()将线程特有对象从其所属的Entry中剥离,即可使线程特有对象(ThreadLocal包含的对象)和线程局部变量(ThreadLocal对象)都被垃圾回收。
关于WeakReference

弱引用, 当一个对象仅仅被weak reference指向, 而没有任何其他强引用指向的时候,只能生存到下一次垃圾回收之前,GC发生时,不管内存够不够,都会被回收。
下面代码里a=null并不能使对象A在GC的时候被垃圾回收,因为b也是强引用并持有对象A的引用。
如果用WeakReference来引用a,这时候如果a=null,那么GC的时候堆内存的对象A就会被回收;如果a==null不写,那么对象A在内存还是有强引用指向的,就不会被垃圾回收。

A a = new A();
B b = new B(a);
a = null; 
------------------------------
A a = new A();
WeakReference<A> reference = new WeakReference(a);
System.out.println(reference.get());
System.gc();
System.out.println(reference.get());
结果:
com.refinitiv.applayer.ftp.entity.A@5910e440
com.refinitiv.applayer.ftp.entity.A@5910e440
------------------------------
A a = new A();
WeakReference<A> reference = new WeakReference(a);
a = null;
System.out.println(reference.get());
System.gc();
System.out.println(reference.get());
结果:
com.refinitiv.applayer.ftp.entity.A@5910e440
null

ThreadLocal在JVM中(简易版)

Thread实例里面都有ThreadLocalMap变量,在线程里面调用ThreadLocal.set()或者get()方法,会创建当前线程所属的ThreadLocalMap实例,ThreadLocalMap内部主要是Entry类型数组Entry[] table,即会创建一个Entry来关联当前ThreadLocal实例(WeakReference类型的 )以及共享变量Value。
当threadLocal引用变量置为null(指向内存ThreadLocal的引用变量被回收),没有强引用指向堆内存ThreadLocal对象,那么发生GC,WeakReference类型的变量会被回收,那么entry的key=null,但是thread对于value的强引用还存在,导致entry对象也一直留在堆内存,如果不主动调用remove(),如果thread挂起或者别的原因不能正常结束,由于ThreadLocalMap 的生命周期跟 Thread 一样长,那么堆内存一直得不到释放,会造成内存泄漏。

JVM-ThreadLocal
源码分析

Thread类里面定义了一个ThreadLocal.ThreadLocalMap成员变量,这个变量是在ThreadLocal类里面进行维护的。

    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocal主要的方法有set(), get(), setInitialValue()以及remove()方法。
因为WeakReference的存活时间只能到下一次GC,GC就会被回收,所以set()和get()方法都会调用expungeStaleEntry()来清除无效entry,避免内存泄漏。

public T get() {
    //拿到当前Thread的ThreadLocalMap实例
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        //拿到ThreadMap之后就根据当前ThreadLocal实例获得Entry实例
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
   //如果Entry不存在就调用setInitialValue来设置初始值,initialValue一般在在声明ThreadLocal变量的时候提供,如果没有提供,则为null。
    return setInitialValue();
}
/**
 * 根据提供的initialValue方法来设置初始值,这里面会创建当前Thread内部的ThreadLocalMap
 */
private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}

/**
 * 设置当前线程的私有变量
 */
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

/**
 * 移除当前Thread关联的私有变量
 * 拿到ThreadLocalMap里的Entry对象,清除WeakReference,然后调用expungeStaleEntry清除无效entry
 */
 public void remove() {
     ThreadLocalMap m = getMap(Thread.currentThread());
     if (m != null)
         m.remove(this);
 }

ThreadLocalMap是ThreadLocal的静态内部类,ThreadLocalMap内部又定义了一个Entry类型的静态内部类。

static class ThreadLocalMap {
    //Entry静态内部类,继承WeakReference,Entry内部定义了一个ThreadLocal类型的WeakReference弱引用,以及强引用value
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    //Entry类型的数组,ThreadLocalMap内部维护的Entry数组
    private Entry[] table;
    //懒加载模式创建ThreadLocalMap对象
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        table = new Entry[INITIAL_CAPACITY];
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
        setThreshold(INITIAL_CAPACITY);
    }
    //根据ThreadLocal实例获得Entry对象
    private Entry getEntry(ThreadLocal<?> key) {
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
        //如果找到Entry e并且e的Reference就是key,就return
        //如果e不为null,但是key不一样就调用getEntryAfterMiss
        if (e != null && e.get() == key)
            return e;
        else
            return getEntryAfterMiss(key, i, e);
    }
    //getEntryAfterMiss主要做的就是Entry的WeakReference如果被垃圾回收,而变成null,
    //就可以提前通过expungeStaleEntry来回收已经无效的entry,防止内存泄漏。
    private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
        Entry[] tab = table;
        int len = tab.length;

        while (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == key)
                return e;
            if (k == null)
               //清楚stale无效的entry
                expungeStaleEntry(i);
            else
                i = nextIndex(i, len);
            e = tab[i];
        }
        return null;
    }
    //为ThreadLocal实例和Value创建关联
    private void set(ThreadLocal<?> key, Object value) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        //根据ThreadLocal拿到Entr
        //如果Entry存在,且key相等则直接更新value
        //   如果key已经被垃圾回收则首先会清理expunge无效的Entry,并创建新的Entry来替换。
        //如果Entry不存在,则直接创建Entry
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();
            if (k == key) {
                e.value = value;
                return;
            }
            if (k == null) {
                replaceStaleEntry(key, value, i);
                return;
            }
        }
        tab[i] = new Entry(key, value);
        int sz = ++size;
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }
    /**
     * 根据ThreadLocal实例移除Entry
     */
    private void remove(ThreadLocal<?> key) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                e.clear();
                expungeStaleEntry(i);
                return;
            }
        }
    }
    /**
     * 在set的时候如果entry无效了,需要重新创建entry替换无效entry
     */
    private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                   int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        Entry e;
        int slotToExpunge = staleSlot;
        for (int i = prevIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = prevIndex(i, len))
            if (e.get() == null)
                slotToExpunge = i;
        for (int i = nextIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            if (k == key) {
                e.value = value;
                tab[i] = tab[staleSlot];
                tab[staleSlot] = e;

                // Start expunge at preceding stale entry if it exists
                if (slotToExpunge == staleSlot)
                    slotToExpunge = i;
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                return;
            }
            if (k == null && slotToExpunge == staleSlot)
                slotToExpunge = i;
        }

        // If key not found, put new entry in stale slot
        tab[staleSlot].value = null;
        tab[staleSlot] = new Entry(key, value);

        // If there are any other stale entries in run, expunge them
        if (slotToExpunge != staleSlot)
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }

    /**
     * 主要清除无效Entry的方法
     */
    private int expungeStaleEntry(int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;

        // expunge entry at staleSlot
        tab[staleSlot].value = null;
        tab[staleSlot] = null;
        size--;
        // Rehash until we encounter null
        Entry e;
        int i;
        for (i = nextIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null;
                tab[i] = null;
                size--;
            } else {
                int h = k.threadLocalHashCode & (len - 1);
                if (h != i) {
                    tab[i] = null;
                    // Unlike Knuth 6.4 Algorithm R, we must scan until
                    // null because multiple entries could have been stale.
                    while (tab[h] != null)
                        h = nextIndex(h, len);
                    tab[h] = e;
                }
            }
        }
        return i;
    }
    /**
     * 清除固定位置的无效Entry
     */
    private boolean cleanSomeSlots(int i, int n) {
        boolean removed = false;
        Entry[] tab = table;
        int len = tab.length;
        do {
            i = nextIndex(i, len);
            Entry e = tab[i];
            if (e != null && e.get() == null) {
                n = len;
                removed = true;
                i = expungeStaleEntry(i);
            }
        } while ( (n >>>= 1) != 0);
        return removed;
    }

    /**
     * 在set的时候需要扩张entry数组,会调用expungeStaleEntries清除无效Entry
     */
    private void rehash() {
        expungeStaleEntries();

        // Use lower threshold for doubling to avoid hysteresis
        if (size >= threshold - threshold / 4)
            resize();
    }

    /**
     * rehash的时候会遍历table,清除stale的无效Entry
     */
    private void expungeStaleEntries() {
        Entry[] tab = table;
        int len = tab.length;
        for (int j = 0; j < len; j++) {
            Entry e = tab[j];
            if (e != null && e.get() == null)
                expungeStaleEntry(j);
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读