Netty之FastThreadLocal
前言
ThreadLocal我们知道是线程局部变量,线程与线程之间隔离的。ThreadLocal源码分析之前有说过,ThreadLocal每次会在当前线程所维护的ThreadLocalMap数组集合中去查找变量,通过线性探测法去解决哈希冲突问题,当遇到冲突时,会查找冲突所在的位置继续将游标向下一空间进行查找。因此我们在使用JDK提供的ThreadLocal时,在查询的过程中就比较耗时。因此,Netty框架造了个新轮子——FastThreadLocal。
使用方式与测试
Step1:引入Netty依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
Step2: 编写代码与JDK ThreadLocal对比(伪代码)
FastThreadLocal
private static final int MAX = 100000;
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("Thread-01");
// 定义一个FastThreadLocal数组
FastThreadLocal<String>[] fastThreadLocal = new FastThreadLocal[MAX];
// 开辟100000个FastThreadLocal
for (int i = 0; i < MAX; i++) {
fastThreadLocal[i] = new FastThreadLocal<>();
}
Thread thread = defaultThreadFactory.newThread(() -> {
for (int i = 0; i < MAX; i++) {
fastThreadLocal[i].set("java: " + i);
}
System.out.println("fastThreadLocal set: " + (System.currentTimeMillis() - start));
for (int i = 0; i < MAX; i++) {
for (int j = 0; j < MAX; j++) {
fastThreadLocal[i].get();
}
}
});
thread.start();
try {
// 等待线程执行完毕再进行输出
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("fastThreadLocal total: " + (System.currentTimeMillis() - start));
ThreadLocal
long start = System.currentTimeMillis();
ThreadLocal<String>[] threadLocals = new ThreadLocal[MAX];
for (int i = 0; i < MAX; i++) {
threadLocals[i] = new ThreadLocal<>();
}
Thread thread = new Thread(() -> {
for (int i = 0; i < MAX; i++) {
threadLocals[i].set("java: " + i);
}
System.out.println("threadLocal set: " + (System.currentTimeMillis() - start));
for (int i = 0; i < MAX; i++) {
for (int j = 0; j < MAX; j++) {
threadLocals[i].get();
}
}
});
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("threadLocal total: " + (System.currentTimeMillis() - start));
结合线程池写法
final DefaultThreadFactory factory = new DefaultThreadFactory("Thead-pool-01", false);
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new ThreadFactory() {
public Thread newThread(Runnable r) {
return factory.newThread(r);
}
});
结果
fastThreadLocal set: 66
threadLocal set: 78
fastThreadLocal total: 93
threadLocal total: 13453
由此可见FastThreadLocal确实快!为什么性能这么好呢?
Step3:分析原因
1、DefaultThreadFactory
// DefaultThreadFactory工厂绑定一个poolId
private static final AtomicInteger poolId = new AtomicInteger();
// 记录同一个工厂生产出的多个Thread与其绑定
private final AtomicInteger nextId = new AtomicInteger();
private final String prefix;
private final boolean daemon;
private final int priority;
@Override
public Thread newThread(Runnable r) {
// 设置线程名称以及对runnable进行包装
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
try {
// 省略设置守护线程与优先级操作
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
// 创建一个FastThreadLocalThread
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(r, name);
}
// runnable 检测
private static final class DefaultRunnableDecorator implements Runnable {
private final Runnable r;
DefaultRunnableDecorator(Runnable r) {
this.r = r;
}
@Override
public void run() {
try {
r.run();
} finally {
FastThreadLocal.removeAll();
}
}
}
DefaultThreadFactory工厂类,用于创建出线程,不过这里要看明白的是,FastThreadLocal针对Thread进行了一层包装也就是FastThreadLocalThread,后续会介绍。
继续看到针对线程执行的runnable方法也进行了一层包装,用于检测方法执行完毕后,执行对FastThreadLocal.removeAll()操作,这里我们猜想是为了防止内存泄漏而设计的。
2、FastThreadLocalThread
public class FastThreadLocalThread extends Thread implements FastThreadLocalAccess {
private InternalThreadLocalMap threadLocalMap;
@Override
public final InternalThreadLocalMap threadLocalMap() {
return threadLocalMap;
}
@Override
public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}
}
一个线程对应一个InternalThreadLocalMap,实现了FastThreadLocalAccess接口,看来核心在于这个类中。
3、FastThreadLocal
我们回头先来看看FastThreadLocal这个类,也就是我们要进行分析的类。
// index将一对一FastThreadLocal
private final int index;
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
// 获取value
public final V get() {
// 获取线程应得ThreadLocalMap
return get(InternalThreadLocalMap.get());
}
@SuppressWarnings("unchecked")
public final V get(InternalThreadLocalMap threadLocalMap) {
Object v = threadLocalMap.indexedVariable(index);
// 若获取的当前值不是UNSET 未设置的对象
if (v != InternalThreadLocalMap.UNSET) {
// 直接返回
return (V) v;
}
// 否则要初始化值为null
return initialize(threadLocalMap);
}
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
// 向ThreadLocalMap中设置值 将index value 设置到ThreadLocalMap中
threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
set(InternalThreadLocalMap.get(), value);
} else {
remove();
}
}
public final void set(InternalThreadLocalMap threadLocalMap, V value) {
// 若ThreadLocalMap中不存在
if (value != InternalThreadLocalMap.UNSET) {
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
}
} else {
// 移除
remove(threadLocalMap);
}
}
4、InternalThreadLocalMap
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
}
可见InternalThreadLocalMap 继承了UnpaddedInternalThreadLocalMap ,看下父类UnpaddedInternalThreadLocalMap都有那些内容。
4.1 UnpaddedInternalThreadLocalMap
class UnpaddedInternalThreadLocalMap {
static ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap;
static final AtomicInteger nextIndex = new AtomicInteger();
/** Used by {@link FastThreadLocal} */
// 用于保存index 对应的ThreadLocalMap
Object[] indexedVariables;
// Core thread-locals
int futureListenerStackDepth;
int localChannelReaderStackDepth;
Map<Class<?>, Boolean> handlerSharableCache;
IntegerHolder counterHashCode;
ThreadLocalRandom random;
Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
// String-related thread-locals
StringBuilder stringBuilder;
Map<Charset, CharsetEncoder> charsetEncoderCache;
Map<Charset, CharsetDecoder> charsetDecoderCache;
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
this.indexedVariables = indexedVariables;
}
}
回到InternalThreadLocalMap,我们来看下
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
private static Object[] newIndexedVariableTable() {
// 开辟一个数组容器保存index(FastThreadLocal)对应的value值
Object[] array = new Object[32];
// 填充满,并且设置未设置对象
Arrays.fill(array, UNSET);
return array;
}
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
// 查找速度非常快 判断是否小于当前的length,小于直接返回index对应的值
return index < lookup.length? lookup[index] : UNSET;
}
/**
* @return {@code true} if and only if a new thread-local variable has been created
*/
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
// 由于index是自增的,所以和数组的大小是一致的
if (index < lookup.length) {
// 找到之前的值
Object oldValue = lookup[index];
lookup[index] = value;
// 将之前的值赋值为UNSET,避免内存泄漏
return oldValue == UNSET;
} else {
// 扩容
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
// 按位异或无符号位右移 不影响实际结果
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
// 自增
newCapacity ++;
// 数组复制
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
// 将index 从旧的index 到扩容的位置 的范围内的对象设置位UNSET
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
// 赋值
newArray[index] = value;
// 将容器变量赋值为扩容后的对象
indexedVariables = newArray;
}
}
// 根据当前所在的线程,找到对应的线程中的InternalThreadLocalMap
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
// 若当前线程是FastThreadLocalAccess包装线程
if (thread instanceof FastThreadLocalAccess) {
// 则快速查找
return fastGet((FastThreadLocalAccess) thread);
} else {
// 否则慢查找
return slowGet();
}
}
public static InternalThreadLocalMap getIfSet() {
Thread thread = Thread.currentThread();
InternalThreadLocalMap threadLocalMap;
if (thread instanceof FastThreadLocalAccess) {
// 直接找到FastThreadLocalAccess
threadLocalMap = ((FastThreadLocalAccess) thread).threadLocalMap();
} else {
// 这里直接用JDK提供的ThreadLocal保存一份InternalThreadLocalMap
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
if (slowThreadLocalMap == null) {
threadLocalMap = null;
} else {
threadLocalMap = slowThreadLocalMap.get();
}
}
return threadLocalMap;
}
// 快查
private static InternalThreadLocalMap fastGet(FastThreadLocalAccess thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
// 慢查
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
if (slowThreadLocalMap == null) {
UnpaddedInternalThreadLocalMap.slowThreadLocalMap =
slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
}
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
ok,到这里 基本上FastThreadLocal的set(),get()方法的流程基本上走了一遍。这边有个优化的地方,就是UnpaddedInternalThreadLocalMap#nextIndex这个变量,是static final的,若创建了1000个FastThreadLocal,那么nextIndex 就是1000,这样的话由于方法区是线程共享的,那么其他线程在扩容时会直接从1000 开始递增1进行扩容。所以这里我们可以做一个优化,就是判断当前线程提供一个ThreadLocal局部变量 当nextIndex超过一定的阈值再触发扩容。
总结
ThreadLocal的线性探测法,效率要高很多。
首先FastThreadLocal家族,Netty 提供了FastThreadLocalThread的包装类,提供了线程对应所需保存FastThreadLocal的InternalThreadLocalMap。其中InternalThreadLocalMap中提供了原子性的AtomicInteger进行index值递增,index对应一个FastThreadLocal,规避了冲突问题,并实现了对元素值得改动。
优点
FastThreadLocal不存在哈希冲突,完全是采用空间换时间来提高FastThreadLocal的检索效率。
缺点
首先由于FastThreadLocal 维护了一个UNSET数组,容量于InternalThreadLocalMap容量一致,所以导致占用空间很大。