20. 并发终结之ThreadLocal
我们提到过如何保证共享变量的线程安全性,比如可以用synchronized内部锁,也可以使用Lock,volatile或者原子变量。
除此之外ThreadLocal也是保证共享变量线程安全的一个手段,它将共享变量变成线程私有(保证了对非线程安全对象访问的线程安全,又避免了锁的开销)。
final static ThreadLocal<T> VAR = new ThreadLocal()
ThreadLocal通常会被作为某个类的静态字段使用,因为如果声明成实例变量,那么每创建该类的一个实例都会导致新的ThreadLocal实例被创建,即使没有导致错误,也会导致重复创建对象带来的浪费;static意思就是这个类的所有实例共有这个VAR变量。
ThreadLocal实现机制:
Java里面每个Thread都维护了一个ThreadLocalMap对象threadLocals,ThreadLocalMap里面包含了若干Entry(private Entry[] table....一个key-value键值对),因此可以说每个线程都拥有若干这样的条目,相应的线程就被称为这些条目的属主线程。
Entry的key是一个ThreadLocal实例,Value是一个线程特有对象,即Entry的作用就是建立一个ThreadLocal实例与线程特有对象之间的对应关系,但是由于Entry对ThreadLocal实例的引用(Key的引用)是一个弱引用(WeakReference),因此它不会阻止被引用的ThreadLocal实例被垃圾回收,如果ThreadLocal引用置为null,那么GC的时候ThreadLocal实例被垃圾回收,那么其所在Entry的Key会被设置成null,此时Entry就是无效条目。另一方面,由于Entry对Value的引用是强引用,如果Entry本身有对它的可达强引用,那么Entry也会阻止其引用的Value被垃圾回收。
此时,当ThreadLocalMap有新的ThreadLocal线程和Value对象关系被创建(相当于有新的Entry对象被添加到ThreadLocalMap)的时候,ThreadLocalMap会将无效条目清理,即使得对Value对象的强引用被清除,Value对象得以被垃圾回收;但是也有缺点,如果线程在相当长时间(或者一直)处于非运行状态,那么该线程的ThreadLocalMap可能就没有任何变化,相应的ThreadLocalMap中无效条目也不会被清理,这就可能导致这些线程的Entry所引用的Value对象无法被线程回收。导致伪内存泄漏。
例如Value实线部分,即线程对象持有线程特有对象的强引用,可能会导致伪内存泄漏,那么只要打破这种关系,即通过调用ThreadLocal.remove()将线程特有对象从其所属的Entry中剥离,即可使线程特有对象(ThreadLocal包含的对象)和线程局部变量(ThreadLocal对象)都被垃圾回收。
关于WeakReference
弱引用, 当一个对象仅仅被weak reference指向, 而没有任何其他强引用指向的时候,只能生存到下一次垃圾回收之前,GC发生时,不管内存够不够,都会被回收。
下面代码里a=null并不能使对象A在GC的时候被垃圾回收,因为b也是强引用并持有对象A的引用。
如果用WeakReference来引用a,这时候如果a=null,那么GC的时候堆内存的对象A就会被回收;如果a==null不写,那么对象A在内存还是有强引用指向的,就不会被垃圾回收。
A a = new A();
B b = new B(a);
a = null;
------------------------------
A a = new A();
WeakReference<A> reference = new WeakReference(a);
System.out.println(reference.get());
System.gc();
System.out.println(reference.get());
结果:
com.refinitiv.applayer.ftp.entity.A@5910e440
com.refinitiv.applayer.ftp.entity.A@5910e440
------------------------------
A a = new A();
WeakReference<A> reference = new WeakReference(a);
a = null;
System.out.println(reference.get());
System.gc();
System.out.println(reference.get());
结果:
com.refinitiv.applayer.ftp.entity.A@5910e440
null
ThreadLocal在JVM中(简易版)
Thread实例里面都有ThreadLocalMap变量,在线程里面调用ThreadLocal.set()或者get()方法,会创建当前线程所属的ThreadLocalMap实例,ThreadLocalMap内部主要是Entry类型数组Entry[] table,即会创建一个Entry来关联当前ThreadLocal实例(WeakReference类型的 )以及共享变量Value。
当threadLocal引用变量置为null(指向内存ThreadLocal的引用变量被回收),没有强引用指向堆内存ThreadLocal对象,那么发生GC,WeakReference类型的变量会被回收,那么entry的key=null,但是thread对于value的强引用还存在,导致entry对象也一直留在堆内存,如果不主动调用remove(),如果thread挂起或者别的原因不能正常结束,由于ThreadLocalMap 的生命周期跟 Thread 一样长,那么堆内存一直得不到释放,会造成内存泄漏。
源码分析
Thread类里面定义了一个ThreadLocal.ThreadLocalMap成员变量,这个变量是在ThreadLocal类里面进行维护的。
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal主要的方法有set(), get(), setInitialValue()以及remove()方法。
因为WeakReference的存活时间只能到下一次GC,GC就会被回收,所以set()和get()方法都会调用expungeStaleEntry()来清除无效entry,避免内存泄漏。
public T get() {
//拿到当前Thread的ThreadLocalMap实例
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
//拿到ThreadMap之后就根据当前ThreadLocal实例获得Entry实例
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//如果Entry不存在就调用setInitialValue来设置初始值,initialValue一般在在声明ThreadLocal变量的时候提供,如果没有提供,则为null。
return setInitialValue();
}
/**
* 根据提供的initialValue方法来设置初始值,这里面会创建当前Thread内部的ThreadLocalMap
*/
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
/**
* 设置当前线程的私有变量
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
/**
* 移除当前Thread关联的私有变量
* 拿到ThreadLocalMap里的Entry对象,清除WeakReference,然后调用expungeStaleEntry清除无效entry
*/
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
ThreadLocalMap是ThreadLocal的静态内部类,ThreadLocalMap内部又定义了一个Entry类型的静态内部类。
static class ThreadLocalMap {
//Entry静态内部类,继承WeakReference,Entry内部定义了一个ThreadLocal类型的WeakReference弱引用,以及强引用value
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
//Entry类型的数组,ThreadLocalMap内部维护的Entry数组
private Entry[] table;
//懒加载模式创建ThreadLocalMap对象
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
//根据ThreadLocal实例获得Entry对象
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
//如果找到Entry e并且e的Reference就是key,就return
//如果e不为null,但是key不一样就调用getEntryAfterMiss
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
//getEntryAfterMiss主要做的就是Entry的WeakReference如果被垃圾回收,而变成null,
//就可以提前通过expungeStaleEntry来回收已经无效的entry,防止内存泄漏。
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
//清楚stale无效的entry
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
//为ThreadLocal实例和Value创建关联
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
//根据ThreadLocal拿到Entr
//如果Entry存在,且key相等则直接更新value
// 如果key已经被垃圾回收则首先会清理expunge无效的Entry,并创建新的Entry来替换。
//如果Entry不存在,则直接创建Entry
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
/**
* 根据ThreadLocal实例移除Entry
*/
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
/**
* 在set的时候如果entry无效了,需要重新创建entry替换无效entry
*/
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
/**
* 主要清除无效Entry的方法
*/
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
/**
* 清除固定位置的无效Entry
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
/**
* 在set的时候需要扩张entry数组,会调用expungeStaleEntries清除无效Entry
*/
private void rehash() {
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
if (size >= threshold - threshold / 4)
resize();
}
/**
* rehash的时候会遍历table,清除stale的无效Entry
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
}