Netty之FastThreadLocal

2020-11-12  本文已影响0人  巴黎没有摩天轮Li

前言

ThreadLocal我们知道是线程局部变量,线程与线程之间隔离的。ThreadLocal源码分析之前有说过,ThreadLocal每次会在当前线程所维护的ThreadLocalMap数组集合中去查找变量,通过线性探测法去解决哈希冲突问题,当遇到冲突时,会查找冲突所在的位置继续将游标向下一空间进行查找。因此我们在使用JDK提供的ThreadLocal时,在查询的过程中就比较耗时。因此,Netty框架造了个新轮子——FastThreadLocal。

使用方式与测试

Step1:引入Netty依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

Step2: 编写代码与JDK ThreadLocal对比(伪代码)

FastThreadLocal
private static final int MAX = 100000;
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("Thread-01");
// 定义一个FastThreadLocal数组
FastThreadLocal<String>[] fastThreadLocal = new FastThreadLocal[MAX];
// 开辟100000个FastThreadLocal
for (int i = 0; i < MAX; i++) {
    fastThreadLocal[i] = new FastThreadLocal<>();
}
Thread thread = defaultThreadFactory.newThread(() -> {
    for (int i = 0; i < MAX; i++) {
        fastThreadLocal[i].set("java: " + i);
    }
    System.out.println("fastThreadLocal set: " + (System.currentTimeMillis() - start));
    for (int i = 0; i < MAX; i++) {
        for (int j = 0; j < MAX; j++) {
            fastThreadLocal[i].get();
        }
    }
});
thread.start();
try {
    // 等待线程执行完毕再进行输出
    thread.join();
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println("fastThreadLocal total: " + (System.currentTimeMillis() - start));
ThreadLocal
long start = System.currentTimeMillis();
ThreadLocal<String>[] threadLocals = new ThreadLocal[MAX];
for (int i = 0; i < MAX; i++) {
    threadLocals[i] = new ThreadLocal<>();
}
Thread thread = new Thread(() -> {
    for (int i = 0; i < MAX; i++) {
        threadLocals[i].set("java: " + i);
    }
    System.out.println("threadLocal set: " + (System.currentTimeMillis() - start));
    for (int i = 0; i < MAX; i++) {
        for (int j = 0; j < MAX; j++) {
            threadLocals[i].get();
        }
    }
});
thread.start();
try {
    thread.join();
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println("threadLocal total: " + (System.currentTimeMillis() - start));

结合线程池写法

final DefaultThreadFactory factory = new DefaultThreadFactory("Thead-pool-01", false);
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new ThreadFactory() {
    public Thread newThread(Runnable r) {
        return factory.newThread(r);
    }
});

结果

fastThreadLocal set: 66
threadLocal set: 78
fastThreadLocal total: 93
threadLocal total: 13453

由此可见FastThreadLocal确实快!为什么性能这么好呢?

Step3:分析原因

1、DefaultThreadFactory
// DefaultThreadFactory工厂绑定一个poolId
private static final AtomicInteger poolId = new AtomicInteger();
// 记录同一个工厂生产出的多个Thread与其绑定
private final AtomicInteger nextId = new AtomicInteger();
private final String prefix;
private final boolean daemon;
private final int priority;

@Override
public Thread newThread(Runnable r) {
    // 设置线程名称以及对runnable进行包装
    Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
    try {
        // 省略设置守护线程与优先级操作
    } catch (Exception ignored) {
        // Doesn't matter even if failed to set.
    }
    return t;
}
// 创建一个FastThreadLocalThread
protected Thread newThread(Runnable r, String name) {
    return new FastThreadLocalThread(r, name);
}
// runnable 检测
private static final class DefaultRunnableDecorator implements Runnable {
    private final Runnable r;
    DefaultRunnableDecorator(Runnable r) {
        this.r = r;
    }
    @Override
    public void run() {
        try {
            r.run();
        } finally {
            FastThreadLocal.removeAll();
        }
    }
}

DefaultThreadFactory工厂类,用于创建出线程,不过这里要看明白的是,FastThreadLocal针对Thread进行了一层包装也就是FastThreadLocalThread,后续会介绍。
继续看到针对线程执行的runnable方法也进行了一层包装,用于检测方法执行完毕后,执行对FastThreadLocal.removeAll()操作,这里我们猜想是为了防止内存泄漏而设计的。

2、FastThreadLocalThread
public class FastThreadLocalThread extends Thread implements FastThreadLocalAccess {
    private InternalThreadLocalMap threadLocalMap;
    @Override
    public final InternalThreadLocalMap threadLocalMap() {
        return threadLocalMap;
    }

    @Override
    public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
        this.threadLocalMap = threadLocalMap;
    }
}

一个线程对应一个InternalThreadLocalMap,实现了FastThreadLocalAccess接口,看来核心在于这个类中。

3、FastThreadLocal

我们回头先来看看FastThreadLocal这个类,也就是我们要进行分析的类。

// index将一对一FastThreadLocal
private final int index;
public FastThreadLocal() {
    index = InternalThreadLocalMap.nextVariableIndex();
}

// 获取value
public final V get() {
    // 获取线程应得ThreadLocalMap
    return get(InternalThreadLocalMap.get());
}

@SuppressWarnings("unchecked")
public final V get(InternalThreadLocalMap threadLocalMap) {
    Object v = threadLocalMap.indexedVariable(index);
    // 若获取的当前值不是UNSET 未设置的对象
    if (v != InternalThreadLocalMap.UNSET) {
        // 直接返回
        return (V) v;
    }
    // 否则要初始化值为null
    return initialize(threadLocalMap);
}

private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    // 向ThreadLocalMap中设置值 将index value 设置到ThreadLocalMap中
    threadLocalMap.setIndexedVariable(index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}

public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) {
        set(InternalThreadLocalMap.get(), value);
    } else {
        remove();
    }
}

public final void set(InternalThreadLocalMap threadLocalMap, V value) {
    // 若ThreadLocalMap中不存在
    if (value != InternalThreadLocalMap.UNSET) {
        if (threadLocalMap.setIndexedVariable(index, value)) {
            addToVariablesToRemove(threadLocalMap, this);
        }
    } else {
        // 移除
        remove(threadLocalMap);
    }
}
4、InternalThreadLocalMap
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
}

可见InternalThreadLocalMap 继承了UnpaddedInternalThreadLocalMap ,看下父类UnpaddedInternalThreadLocalMap都有那些内容。

4.1 UnpaddedInternalThreadLocalMap
class UnpaddedInternalThreadLocalMap {
    static ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap;
    static final AtomicInteger nextIndex = new AtomicInteger();
    /** Used by {@link FastThreadLocal} */
    // 用于保存index 对应的ThreadLocalMap
    Object[] indexedVariables;
    // Core thread-locals
    int futureListenerStackDepth;
    int localChannelReaderStackDepth;
    Map<Class<?>, Boolean> handlerSharableCache;
    IntegerHolder counterHashCode;
    ThreadLocalRandom random;
    Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
    Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
    // String-related thread-locals
    StringBuilder stringBuilder;
    Map<Charset, CharsetEncoder> charsetEncoderCache;
    Map<Charset, CharsetDecoder> charsetDecoderCache;
    UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
        this.indexedVariables = indexedVariables;
    }
}

回到InternalThreadLocalMap,我们来看下

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    private InternalThreadLocalMap() {
        super(newIndexedVariableTable());
    }

    private static Object[] newIndexedVariableTable() {
        // 开辟一个数组容器保存index(FastThreadLocal)对应的value值
        Object[] array = new Object[32];
        // 填充满,并且设置未设置对象
        Arrays.fill(array, UNSET);
        return array;
    }

    public Object indexedVariable(int index) {
        Object[] lookup = indexedVariables;
        // 查找速度非常快  判断是否小于当前的length,小于直接返回index对应的值
        return index < lookup.length? lookup[index] : UNSET;
    }

   /**
     * @return {@code true} if and only if a new thread-local variable has been created
     */
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        // 由于index是自增的,所以和数组的大小是一致的
        if (index < lookup.length) {
            // 找到之前的值
            Object oldValue = lookup[index];
            lookup[index] = value;
            // 将之前的值赋值为UNSET,避免内存泄漏
            return oldValue == UNSET;
        } else {
            // 扩容
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }

    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity = index;
        // 按位异或无符号位右移 不影响实际结果
        newCapacity |= newCapacity >>>  1;
        newCapacity |= newCapacity >>>  2;
        newCapacity |= newCapacity >>>  4;
        newCapacity |= newCapacity >>>  8;
        newCapacity |= newCapacity >>> 16;
        // 自增
        newCapacity ++;
        // 数组复制
        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        // 将index 从旧的index 到扩容的位置 的范围内的对象设置位UNSET
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        // 赋值
        newArray[index] = value;    
        // 将容器变量赋值为扩容后的对象
        indexedVariables = newArray;
    }
}

    // 根据当前所在的线程,找到对应的线程中的InternalThreadLocalMap 
    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        // 若当前线程是FastThreadLocalAccess包装线程
        if (thread instanceof FastThreadLocalAccess) {
            // 则快速查找
            return fastGet((FastThreadLocalAccess) thread);
        } else {
            // 否则慢查找
            return slowGet();
        }
    }

    public static InternalThreadLocalMap getIfSet() {
        Thread thread = Thread.currentThread();
        InternalThreadLocalMap threadLocalMap;
        if (thread instanceof FastThreadLocalAccess) {
            // 直接找到FastThreadLocalAccess
            threadLocalMap = ((FastThreadLocalAccess) thread).threadLocalMap();
        } else {
            // 这里直接用JDK提供的ThreadLocal保存一份InternalThreadLocalMap
            ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
            if (slowThreadLocalMap == null) {
                threadLocalMap = null;
            } else {
                threadLocalMap = slowThreadLocalMap.get();
            }
        }
        return threadLocalMap;
    }

    // 快查
    private static InternalThreadLocalMap fastGet(FastThreadLocalAccess thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

    // 慢查
    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        if (slowThreadLocalMap == null) {
            UnpaddedInternalThreadLocalMap.slowThreadLocalMap =
                    slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
        }

        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }

ok,到这里 基本上FastThreadLocal的set(),get()方法的流程基本上走了一遍。这边有个优化的地方,就是UnpaddedInternalThreadLocalMap#nextIndex这个变量,是static final的,若创建了1000个FastThreadLocal,那么nextIndex 就是1000,这样的话由于方法区是线程共享的,那么其他线程在扩容时会直接从1000 开始递增1进行扩容。所以这里我们可以做一个优化,就是判断当前线程提供一个ThreadLocal局部变量 当nextIndex超过一定的阈值再触发扩容。

总结

ThreadLocal的线性探测法,效率要高很多。
首先FastThreadLocal家族,Netty 提供了FastThreadLocalThread的包装类,提供了线程对应所需保存FastThreadLocal的InternalThreadLocalMap。其中InternalThreadLocalMap中提供了原子性的AtomicInteger进行index值递增,index对应一个FastThreadLocal,规避了冲突问题,并实现了对元素值得改动。

优点

FastThreadLocal不存在哈希冲突,完全是采用空间换时间来提高FastThreadLocal的检索效率。

缺点

首先由于FastThreadLocal 维护了一个UNSET数组,容量于InternalThreadLocalMap容量一致,所以导致占用空间很大。

上一篇 下一篇

猜你喜欢

热点阅读