重拾jdk源码重点系列-8:ThreadLocal源码分析
1. ThreadLocal的作用
ThreadLocal的作用是提供线程内的局部变量,说白了,就是在各线程内部创建一个变量的副本,相比于使用各种锁机制访问变量,ThreadLocal的思想就是用空间换时间,使各线程都能访问属于自己这一份的变量副本,变量值不互相干扰,减少同一个线程内的多个函数或者组件之间一些公共变量传递的复杂度。我们看看源码对于ThreadLocal的描述.
This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its {@code get} or {@code set} method) has its own, independently initialized copy of the variable. {@code ThreadLocal} instances are typically private static fields in classes that wish to associate state with a thread (e.g.,a user ID or Transaction ID).
2. 基本用法
实现的功能是给每个线程都有自己唯一的id,且是自增的.
public class ThreadId {
public static final AtomicInteger NEXTId = new AtomicInteger(0);
public static final ThreadLocal<Integer> THREADID = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return NEXTId.incrementAndGet();
}
};
public static int getThreadId() {
return THREADID.get();
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
System.out.println(ThreadId.getThreadId());
}
}).start();
}
Thread.sleep(1000 * 3);
}
}
3. ThreadLocal的数据结构
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode =
new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT); //
}
从上面可以看出,每创建一个ThreadLocal变量,hashcode就会增加0x61c88647.hashcode的作用就是在后面根据在map中根据hash比较ThreadLocalMap的key,从而判定是否相等.之所以用这个数是因为可以是2的幂尽可能分布均匀
在每个线程内部,都会维护一个 ThreadLocal.ThreadLocalMap threadLocals
的成员变量,参考下面这个实例图.每个变量能够将变量私有化的根本原因还是在于ThreadLocalMap.
如图所示,实线是强引用,虚线是弱引用,如果ThreadLocalRef的引用没有了,则只剩下Entry对ThreadLocal有弱引用,我们知道弱引用活不过下次Gc(Entry是弱引用)
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
/**
* The initial capacity -- MUST be a power of two.
*/
private static final int INITIAL_CAPACITY = 16;
/**
* The table, resized as necessary.
* table.length MUST always be a power of two.
*/
private Entry[] table;
/**
* The number of entries in the table.
*/
private int size = 0;
/**
* The next size value at which to resize.
*/
private int threshold; // Default to 0
/**
* Set the resize threshold to maintain at worst a 2/3 load factor.
*/
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
4. get()返回存储在ThreadLocalMap中value
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
从ThreadLocal中获取值的步骤分为如下几步.
- 获取当前线程的ThreadLocalMap
- 把当前的ThreadLocal对象为key,去获取值.若存在,且不为null,则返回.否则设置map,初始化
setInitialValue()
private T setInitialValue() {
T value = initialValue(); //1
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value); //若存在,则设置key,value就可以
else
createMap(t, value); //不存在则创建ThreadLocalMap
return value;
}
- initialValue()返回值为null,说明初始值为null
createMap()
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY]; // 1 初始化数组,初始大小为16
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); //定位到数组下标
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY); //设置阈值
}
- firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1)相当于一个求余的方法,这要求INITIAL_CAPACITY为2的n次幂.经常采用这种方法来求响应的hash值对应在数组中的位置.
5. set()往ThreadLocalMap设置值
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
set()的逻辑如下
- 获取当前线程的ThreadLocalMap
- 如果map不为null,则把传入的值设置进去
- 否则创建新的map,createMap()和前面get()createMap()中的一样.
set(ThreadLocal<?> key, Object value)
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1); //找到在数组中的位置
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get(); //hash的线性探测法
if (k == key) { //遇到相等,则替换
e.value = value;
return;
}
if (k == null) { //发现key为null,则需要把这个key所在的Entry设置为null,然后把这个key后面的元素做再hash往前移动
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value); //第一次遇到Entry为空,则放入进去.运行到这里,说明这个过程中没有key为null的Entry
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold) //在清理完成后,看当前大小有没有超过阈值,看是否需要rehash
rehash();
}
set()方法的逻辑是:
- 找到在数组中的位置
- 遇到相等则替换,如果在这过程中遇到key为null,执行第三步
- 执行replaceStaleEntry()
- 经过2,3两步还没终止,说明遇到Entry为null,则把key,value组成Entry,放入到这个位置.
- 添加了新的元素,需要判断达没达到阈值,达到则需要再hash
replaceStaleEntry()
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
int slotToExpunge = staleSlot; //key为null的Entry,在数组中的下标
for (int i = prevIndex(staleSlot, len); //从该位置往前找
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i; //记录下key为null的点
for (int i = nextIndex(staleSlot, len); //从该位置往后走
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot]; //若找到,把该Entry与传入进来位置的Entry做个交换
tab[staleSlot] = e;
if (slotToExpunge == staleSlot)
slotToExpunge = i; //从交换之后,此时key为nul,正好从这里清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i; //遇到第一个key为null的位置,记录下来
}
//运行到这里,说明没有遇到key相等的,则在slot处新建一个新的Entry,把key,value设置进去.
tab[staleSlot].value = null; //方便GC
tab[staleSlot] = new Entry(key, value);
//如果还有key为null的Entry,则清理
if (slotToExpunge != staleSlot) 说明存在key为null的Entry,则清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
slotToExpunge主要用来记录从前到后key为null的位置,方便清理
- 第1个for循环:我们向前找到key为null的位置,记录为slotToExpunge,这里是为了后面的清理过程,可以不关注了;
- 第2个for循环:我们从staleSlot起到下一个null为止,若是找到key和传入key相等的Entry,就给这个Entry赋新的value值,并且把它和staleSlot位置的Entry交换,然后调用CleanSomeSlots清理key为null的Entry。
- 若是一直没有key和传入key相等的Entry,那么就在staleSlot处新建一个Entry。函数最后再清理一遍空key的Entry。
cleanSomeSlots这个函数是以log(n)的速度去发现key为null的点.如果找到则调用expungeStaleEntry取清除和再hash,它里面就是不断的折半查找.
expungeStaleEntry(int staleSlot)
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 把该位置设置为null
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) { //遇到key为null的,则设置为null,方便gc
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1); //如果有值,再hash
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
expungeStaleEntry的逻辑是:
- 先把该位置设置为null,方便GC
- 从当前位置顺着往下走,直到第一为null的Entry.在这过程中,如果遇到key为null,则把该位置的Entry设置为null,有利于GC.
- 如果key不为null,则把该元素重新hash(线性探测法)
rehash
private void rehash() {
expungeStaleEntries(); //清除过时的Entry,这里只要是key为null,这调用expungeStaleEntry(int staleSlot),也就是上面这个方法
if (size >= threshold - threshold / 4) //清理后,如果size还大于3/4的threshold,那么就扩容
resize();
}
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen]; //开辟一个数组大小是原来两倍大的数组
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // 帮助GC
} else { //重新hash到新数组中
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
rehash的逻辑是:
- 先尝试清除key为null的位置
- 再观察是否达到3/4的阈值,从而来扩容
扩容的逻辑是;
- 开辟一个长度是以前数组两倍的数组,重新hash,放入到新数组中.
- 这个过程中,如果遇到key为空,则把值赋值为null,方便GC
remove
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear(); //把引用设为null,方便GC
expungeStaleEntry(i); //上面已经谈到
return;
}
}
}
remove的处理逻辑是把应用设置为null,方便GC.然后在调用 expungeStaleEntry(i)去掉key为null的Entry,再hash.
5. 关于expungeStaleEntry中当key不为空,为什么要重新hash
是因为,如果不重新hash,那么后来再取寻找的时候,遇到Null就会停止搜索,这就造成原本能够找到的,现在找不到.归根结底采用了链地址法.
6. 使用ThreadLocal的最佳实践
我们发现无论是set,get还是remove方法,过程中key为null的Entry都会被擦除,那么Entry内的value也就没有强引用链,GC时就会被回收。那么怎么会存在内存泄露呢?但是以上的思路是假设你调用get或者set方法了,很多时候我们都没有调用过,所以最佳实践就是
- 1 .使用者需要手动调用remove函数,删除不再使用的ThreadLocal.
- 2 .还有尽量将ThreadLocal设置成private static的,这样ThreadLocal会尽量和线程本身一起消亡。
参考文章:
ThreadLocal源码深度剖析