jdk ThreadLocal 和 netty FastThre
本文基于 jdk1.8 和 netty 4.1.46 , jdk 这些年版本迭代的比较快,每个版本中部分 api 都有优化,netty 同样也是非常活跃,小版本迭非常快。所以讨论 api 不指定版本是没有意义的。
一、jdk1.8 ThreadLocal 原理
每个Thread实例都包含一个ThreadLocalMap的引用,ThreadLocalMap 中有Entry数组,Entry 的key是若引用,类的关系入下图
clipboard.png考虑如下问题:
1、因为 Entry 的 key 是弱引用,所以没有强引用指向,只能生存到下次垃圾回收之前,那么如何处理 key 被回收了的Entry呢?
2、既然是Entry数组,初始大小是多少?,set的时候超过长度如何扩容?,get的时候如何定位数组的index? 。
带着这几个问题我们分析一下代码。
1、ThreadLocal 的初始化:
用全局静态变量 nextHashCode 生成 ThreadLocal 的hash值。
threadLocalHashCode 就是要用来定位 Entry 数组index的。 后面会讲到
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
2、ThreadLocal.set(T value) 方法
获取当前 Thread 实例,并且 Thread 实例中拿到 ThreadLocalMap 实例。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
如果 map 为空则调用 createMap 创建 ThreadLocalMap 实例
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
//初始化Entry数组
table = new Entry[INITIAL_CAPACITY];
//定位数组 index
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
//设置扩容阀值 数组长度的 2/3
setThreshold(INITIAL_CAPACITY);
}
如果map非空,调用set方法
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
//定位数组 index
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//如果查询的,结果 key 相等,那么直接替换 valeu
if (k == key) {
e.value = value;
return;
}
//如果查询的,结果 key 已经被回收了,那么调用一个重要的方法 replaceStaleEntry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
//扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
注意 int i = key.threadLocalHashCode & (len-1); 这个方法。因为len的值是2的n次方(扩容也在原数组长度上乘2)。经过 & 运算后i的值肯定在len之内,确保不会越界。但是有个问题,可能会出现定位的位置冲突。所以需要for循环依次向后查找,直到找到一个为空的,然后 tab[i] = new Entry(key, value);。这里就给get的时候定位index造成了问题。后边讲
在向后查找的过程中,如果 key 相等 那么直接替换 valeu 然后返回
if (k == key) {
e.value = value;
return;
}
如果 key 已经被回收了,那么调用一个重要的方法 replaceStaleEntry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
replaceStaleEntry 方法就是替换已经被回收了的 ThreadLocal 变量。
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
int slotToExpunge = staleSlot;
//向前找到第一个被回收了的 ThreadLocal
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
//向后查找
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//1.1 向后搜索过程中发现key相同的entry
if (k == key) {
//覆盖并且和脏entry进行交换
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
//staleSlot 之前没有被回收的 ThreadLocal
if (slotToExpunge == staleSlot)
slotToExpunge = i;
//搜索脏entry并进行清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
//如果向前未搜索到脏entry,并且在向后查找过程遇到脏entry的话
//就以i为起点清理entiy
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
//如果不只staleSlot一个节点为脏的
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
-
2.1、向前搜索到脏的entry,并且向后搜索过程中发现key相同的entry 如图(图片来自来自网络)
clipboard.png -
2.2 向前搜索到脏的entry,向后搜索过程中 没有发现发现key相同的entry 如图 (图片来自网络)
clipboard.png -
2.3 向前没有发现脏的entry,向后发现了脏的entry,和相同的 key 如图 (来自网络)
clipboard.png
再来看一下进行清理的方法,while循环次数为log2(table.length)-1}。如果遇到脏Entry那么,循环将重新开始,并且范围为数组长度,这块我也不明白为啥。
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
expungeStaleEntry 方法,
步骤
1、清除当前脏entry
2、往后环形继续查找,直到遇到table[i]==null时结束
3、如果在向后搜索过程中再次遇到脏entry,同样将其清理掉
4、如果有hash冲突的情况,重新整理数组
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
//1、清除当前脏entry
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
Entry e;
int i;
//2.往后环形继续查找,直到遇到table[i]==null时结束
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//3. 如果在向后搜索过程中再次遇到脏entry,同样将其清理掉
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
//4、如果有位置冲突的情况,重新整理数组
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
3、ThreadLocal.get(T value) 方法,如果了解了 set 方法,那么get 方法相对就简单了
1、拿到当前线程,的 ThreadLocalMap 对象。
2、ThreadLocalMap 对象获取 Entry
3、 如果 ThreadLocalMap 为空那么调用 setInitialValue 方法,此方法内部 调用了ThreadLocal.set方法
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
//遇到脏key需要释放,清理过程上文已经将到
if (k == null)
expungeStaleEntry(i);
//解决位置冲突问题,所以需要向下查找
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
总结:
-
1、因为 Entry 的 key 是若引用,所以如果没有强引用指向,只能生存到下次垃圾回收之前,那么如何处理key被回收了的Entry呢?
答案:当 ThreadLocal.set()时,如果发现位置冲突,那么nextIndex(i, len)方法寻找空位,在寻找空位的时候,如果发现Entry 的 ThreadLocal 被回收了,那么调用 replaceStaleEntry 进行替换和清理,如果没有冲突那么调用 cleanSomeSlots 方法清理一定范围脏Entry。
当 ThreadLocal.get 的时候如果发现位置冲突,那么nextIndex(i, len)方法寻找符合的Entry,在寻找空位的时候,如果发现Entry 的 ThreadLocal 被回收了,那么调用expungeStaleEntry清除被回收的Entry -
2、既然是Entry数组,初始大小是多少?
private static final int INITIAL_CAPACITY = 16;
- 3、set的时候超过长度如何扩容?
当ThreadLocal.set() 的 Entry 数量大于扩容阀值。threshold = len * 2 / 3; 。调用rehash() 方法。
private void rehash() {
//清除ThreadLocal已经被回收的 Entry
expungeStaleEntries();
//经过清理后 size >= threshold - threshold / 4 那么resize();扩容
if (size >= threshold - threshold / 4)
resize();
}
resize() 的过程就不难了,
1、首先将table 大小扩大一倍。
2、发现 ThreadLocal 被回收了那么,value 复制为null 帮助gc
3、copy数组,重新计算数组下表
4、设置下次扩容的阀值
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
- 4、get的时候如何定位数组的index? .
通过:int i = key.threadLocalHashCode & (table.length - 1);
若位置冲突: i = nextIndex(i, len);
以上可见,大师 Doug Lea,为了解决弱引用被回收问题,和位置冲突问题,耍了一套独孤九剑。
二、netty4.1.46 FastThreadLocal 原理
Netty为了在某些场景下提高性能,改进了jdk ThreadLocal,Netty实现的FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。避免了检测弱引用带来的 value 回收难问题,和数组位置冲突带来的线性查找问题,解决这些问题并不是没有代价,
Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。同时FastThreadLocal 也实现了清理对象的方法,下面会对这些内容进行分别介绍。
为了叙述方便,下文使用FTL指代Netty的FastThreadLocal,使用TL指代Java原生ThreadLocal。
1、类的结构
1.1 FastThreadLocal 类的uml图,
clipboard.png1.2 FastThreadLocalThread
cleanupFastThreadLocals 字段在 4.1.46 的版本中已经没有在用到了
/**
* true,表示FTL会在线程结束时被主动清理 见 FastThreadLocalRunnable 类
* false,需要将FTL放入后台清理线程的队列中
*/
// This will be set to true if we have a chance to wrap the Runnable.
//这个字段则用于标识该线程在结束时是否会主动清理FTL
private final boolean cleanupFastThreadLocals;
//次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 时候创建
private InternalThreadLocalMap threadLocalMap;
public FastThreadLocalThread(Runnable target) {
super(FastThreadLocalRunnable.wrap(target));
cleanupFastThreadLocals = true;
}
1.3 InternalThreadLocalMap
FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:
private InternalThreadLocalMap() {
//为了简便,InternalThreadLocalMap父类
//UnpaddedInternalThreadLocalMap不展开介绍
super(newIndexedVariableTable());
}
//默认的数组大小为32,且使用UNSET对象填充数组
//如果下标处数据为UNSET,则表示没有数据
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充,具体可以参考https://blog.csdn.net/qq_27428109/article/details/74781774,在Java8中则可以使用@sun.misc.Contended注解避免伪共享问题。
//InternalThreadLocalMap
// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
上面我们说到FTL保存了数组下标,FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:
static final AtomicInteger nextIndex = new AtomicInteger();
public static int nextVariableIndex() {
//Netty中所有FTL数组下标都是通过递增这个静态变量实现的
//采用静态变量生成所有FTL元素在数组中的下标会造成一个问题,
//会造成InternalThreadLocalMap中数组不必要的自动扩容
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
//下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数,
//这段代码在早期的 jdk HashMap 中见过
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
1.4 FastThreadLocal
- 构造函数:
有两个重要的下标域,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的value(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex,一般情况 variablesToRemoveIndex = 0。因为variablesToRemoveIndex 是静态变量,所以全局唯一。
//如果在该FTL中放入了数据,也就实际调用了其set或get函数,会在
//该FastThreadLocalThread.threadLocalMap数组的
// variablesToRemoveIndex下标处放置一个IdentityHashMap,
//并将该FTL放入IdentityHashMap中,在后续清理时会取出
//variablesToRemoveIndex下标处的IdentityHashMap进行清理
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
//在threadLocalMap数组中存放实际数据的下标
private final int index;
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
- 用户可扩展的函数:
//初始化 value 函数
protected V initialValue() throws Exception {
return null;
}
//让使用者在该FTL被移除时可以有机会做些操作。
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
1.5 FastThreadLocalThread
cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了
/**
* true,表示FTL会在线程结束时被主动清理 见 FastThreadLocalRunnable 类
* false,需要将FTL放入后台清理线程的队列中
*/
// This will be set to true if we have a chance to wrap the Runnable.
//这个字段则用于标识该线程在结束时是否会主动清理FTL
private final boolean cleanupFastThreadLocals;
//次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 时候创建
private InternalThreadLocalMap threadLocalMap;
public FastThreadLocalThread(Runnable target) {
super(FastThreadLocalRunnable.wrap(target));
cleanupFastThreadLocals = true;
}
2 数据 set 和 get
2.1、数据的设置 set 方法
public final void set(V value) {
//判断设置的 value 值是否是缺省值
if (value != InternalThreadLocalMap.UNSET) {
//获取当前线程的 InternalThreadLocalMap , 如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取
//否则通过 jdk 原生的 threadLocal 获取
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
//FastThreadLocal 对应的 index 下标的 value 替换成新的 value
setKnownNotUnset(threadLocalMap, value);
} else {
//如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象
remove();
}
}
这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
//在数组下标index处放置实际对象,如果index大于数组length,会进行数组扩容.
if (threadLocalMap.setIndexedVariable(index, value)) {
//放置成功之后,将该FTL加入到 variablesToRemoveIndex 下标的
//IdentityHashMap,等待后续清理
addToVariablesToRemove(threadLocalMap, this);
}
}
/**
* 该FTL加入到variablesToRemoveIndex下标的IdentityHashMap
* IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置
*/
@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
//获取 variablesToRemoveIndex下标处的 IdentityHashMap
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
//如果是第一次获取,则 variablesToRemoveIndex下标处的值为 UNSET
if (v == InternalThreadLocalMap.UNSET || v == null) {
//新建一个新的 IdentityHashMap 并
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
//放入到下标variablesToRemoveIndex处
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
//将该FTL放入该IdentityHashMap中
variablesToRemove.add(variable);
}
下面看InternalThreadLocalMap.get()实现:
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
//首先看当前 thread 是否为FastThreadLocalThread实例
//如果是的话,可以快速通过引用,获取到其 threadLocalMap
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
//如果不是,则 jdk 原生慢速获取到其 threadLocalMap
return slowGet();
}
}
2.2、数据的设置 get 方法
了解数据的设置set方法,获取就比较点单,代码就不贴了。
- 1、获取 ThreadLocalMap
- 2、直接通过索引取出对象
- 3、如果为空那么调用初始化方法初始化
3、 清理 FastThreadLocal 对象
3.1 主动清理
清理 FastThreadLocal 对象相关的代码是在 FastThreadLocalThread 类,和 FastThreadLocalRunnable 类中。
FastThreadLocalThread 的代码见上文
FastThreadLocalRunnable.wrap 方法修饰的 Runnable,表示 FTL 会在线程结束时被主动清理,wrap 方法会把原 Runnable.run 方法放在 try 里,然后在 finally 中调用 FastThreadLocal.removeAll() 方法,该方法会对 FTL 进行清理,具体可看下面列出的源码。
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;
private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}
@Override
public void run() {
//run方法放在try里,然后在finally中调用FastThreadLocal.removeAll()方法
try {
runnable.run();
} finally {
FastThreadLocal.removeAll();
}
}
//被wrap的 Runable会变成F astThreadLocalRunnable对象
//FastThreadLocalRunnable在run方法的finally会调用
//FastThreadLocal.removeAll();在线程结束时对FTL
//进行主动清理
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}
3.2 被动清理在4.1.46版本中已经被去掉了
clipboard.png// TODO: We need to find a better way to handle this.
我们需要找到更好的方法来处理这个问题。
但是我目前在源码中没有找到,新的处理办法,或者有知道的不吝赐教,或者这个也作为一个开放性问题吧。