NIOnetty

jdk ThreadLocal 和 netty FastThre

2020-03-10  本文已影响0人  田才

本文基于 jdk1.8 和 netty 4.1.46 , jdk 这些年版本迭代的比较快,每个版本中部分 api 都有优化,netty 同样也是非常活跃,小版本迭非常快。所以讨论 api 不指定版本是没有意义的。

一、jdk1.8 ThreadLocal 原理

每个Thread实例都包含一个ThreadLocalMap的引用,ThreadLocalMap 中有Entry数组,Entry 的key是若引用,类的关系入下图

clipboard.png

考虑如下问题:
1、因为 Entry 的 key 是弱引用,所以没有强引用指向,只能生存到下次垃圾回收之前,那么如何处理 key 被回收了的Entry呢?
2、既然是Entry数组,初始大小是多少?,set的时候超过长度如何扩容?,get的时候如何定位数组的index? 。
带着这几个问题我们分析一下代码。

1、ThreadLocal 的初始化:

用全局静态变量 nextHashCode 生成 ThreadLocal 的hash值。
threadLocalHashCode 就是要用来定位 Entry 数组index的。 后面会讲到

private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode =  new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

2、ThreadLocal.set(T value) 方法

获取当前 Thread 实例,并且 Thread 实例中拿到 ThreadLocalMap 实例。

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

如果 map 为空则调用 createMap 创建 ThreadLocalMap 实例

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
   //初始化Entry数组
    table = new Entry[INITIAL_CAPACITY];
   //定位数组 index
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    //设置扩容阀值 数组长度的 2/3
    setThreshold(INITIAL_CAPACITY);
}

如果map非空,调用set方法

private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    //定位数组 index
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        //如果查询的,结果 key 相等,那么直接替换 valeu
        if (k == key) {
            e.value = value;
            return;
        }
        //如果查询的,结果 key 已经被回收了,那么调用一个重要的方法 replaceStaleEntry
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    tab[i] = new Entry(key, value);
    int sz = ++size;
    //扩容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

注意 int i = key.threadLocalHashCode & (len-1); 这个方法。因为len的值是2的n次方(扩容也在原数组长度上乘2)。经过 & 运算后i的值肯定在len之内,确保不会越界。但是有个问题,可能会出现定位的位置冲突。所以需要for循环依次向后查找,直到找到一个为空的,然后 tab[i] = new Entry(key, value);。这里就给get的时候定位index造成了问题。后边讲

在向后查找的过程中,如果 key 相等 那么直接替换 valeu 然后返回

       if (k == key) {
            e.value = value;
            return;
        }

如果 key 已经被回收了,那么调用一个重要的方法 replaceStaleEntry

        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }

replaceStaleEntry 方法就是替换已经被回收了的 ThreadLocal 变量。

private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    int slotToExpunge = staleSlot;
    //向前找到第一个被回收了的 ThreadLocal
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;
    //向后查找
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
       //1.1 向后搜索过程中发现key相同的entry
        if (k == key) {
            //覆盖并且和脏entry进行交换
            e.value = value;
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;
            //staleSlot 之前没有被回收的 ThreadLocal
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            //搜索脏entry并进行清理
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }
       //如果向前未搜索到脏entry,并且在向后查找过程遇到脏entry的话
     //就以i为起点清理entiy
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);
    //如果不只staleSlot一个节点为脏的
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

再来看一下进行清理的方法,while循环次数为log2(table.length)-1}。如果遇到脏Entry那么,循环将重新开始,并且范围为数组长度,这块我也不明白为啥。

private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        i = nextIndex(i, len);
        Entry e = tab[i];
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}

expungeStaleEntry 方法,

步骤
1、清除当前脏entry
2、往后环形继续查找,直到遇到table[i]==null时结束
3、如果在向后搜索过程中再次遇到脏entry,同样将其清理掉
4、如果有hash冲突的情况,重新整理数组

private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
   //1、清除当前脏entry
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    Entry e;
    int i;
     //2.往后环形继续查找,直到遇到table[i]==null时结束
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
         //3. 如果在向后搜索过程中再次遇到脏entry,同样将其清理掉
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            //4、如果有位置冲突的情况,重新整理数组
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) {
                tab[i] = null;
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i;
}

3、ThreadLocal.get(T value) 方法,如果了解了 set 方法,那么get 方法相对就简单了

1、拿到当前线程,的 ThreadLocalMap 对象。
2、ThreadLocalMap 对象获取 Entry
3、 如果 ThreadLocalMap 为空那么调用 setInitialValue 方法,此方法内部 调用了ThreadLocal.set方法

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;
    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        //遇到脏key需要释放,清理过程上文已经将到
        if (k == null)
            expungeStaleEntry(i);
        //解决位置冲突问题,所以需要向下查找
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

总结:

private static final int INITIAL_CAPACITY = 16;
private void rehash() {
    //清除ThreadLocal已经被回收的 Entry
    expungeStaleEntries();
    //经过清理后  size >= threshold - threshold / 4 那么resize();扩容
    if (size >= threshold - threshold / 4)
        resize();
}

resize() 的过程就不难了,
1、首先将table 大小扩大一倍。
2、发现 ThreadLocal 被回收了那么,value 复制为null 帮助gc
3、copy数组,重新计算数组下表
4、设置下次扩容的阀值

private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;
    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }
    setThreshold(newLen);
    size = count;
    table = newTab;
}
以上可见,大师 Doug Lea,为了解决弱引用被回收问题,和位置冲突问题,耍了一套独孤九剑。

二、netty4.1.46 FastThreadLocal 原理

Netty为了在某些场景下提高性能,改进了jdk ThreadLocal,Netty实现的FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。避免了检测弱引用带来的 value 回收难问题,和数组位置冲突带来的线性查找问题,解决这些问题并不是没有代价,
Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。同时FastThreadLocal 也实现了清理对象的方法,下面会对这些内容进行分别介绍。
为了叙述方便,下文使用FTL指代Netty的FastThreadLocal,使用TL指代Java原生ThreadLocal。

1、类的结构

1.1 FastThreadLocal 类的uml图,
clipboard.png
1.2 FastThreadLocalThread

cleanupFastThreadLocals 字段在 4.1.46 的版本中已经没有在用到了

/**
 * true,表示FTL会在线程结束时被主动清理 见  FastThreadLocalRunnable 类
 * false,需要将FTL放入后台清理线程的队列中
 */
// This will be set to true if we have a chance to wrap the Runnable.
//这个字段则用于标识该线程在结束时是否会主动清理FTL
private final boolean cleanupFastThreadLocals;
//次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 时候创建
private InternalThreadLocalMap threadLocalMap;

public FastThreadLocalThread(Runnable target) {
    super(FastThreadLocalRunnable.wrap(target));
    cleanupFastThreadLocals = true;
}
1.3 InternalThreadLocalMap

FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:

        private InternalThreadLocalMap() {
            //为了简便,InternalThreadLocalMap父类
            //UnpaddedInternalThreadLocalMap不展开介绍
            super(newIndexedVariableTable());
        }
        //默认的数组大小为32,且使用UNSET对象填充数组
        //如果下标处数据为UNSET,则表示没有数据
        private static Object[] newIndexedVariableTable() {
            Object[] array = new Object[32];
            Arrays.fill(array, UNSET);
            return array;
        }

为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充,具体可以参考https://blog.csdn.net/qq_27428109/article/details/74781774,在Java8中则可以使用@sun.misc.Contended注解避免伪共享问题。

//InternalThreadLocalMap
// Cache line padding (must be public) 
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes. 
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

上面我们说到FTL保存了数组下标,FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:

static final AtomicInteger nextIndex = new AtomicInteger();
public static int nextVariableIndex() {
    //Netty中所有FTL数组下标都是通过递增这个静态变量实现的
    //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题,
    //会造成InternalThreadLocalMap中数组不必要的自动扩容
    int index = nextIndex.getAndIncrement();
    if (index < 0) {
        nextIndex.decrementAndGet();
        throw new IllegalStateException("too many thread-local indexed variables");
    }
    return index;
}

InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:

private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数, 
    //这段代码在早期的 jdk HashMap 中见过
    int newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;

    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    indexedVariables = newArray;
}
1.4 FastThreadLocal
       //如果在该FTL中放入了数据,也就实际调用了其set或get函数,会在
       //该FastThreadLocalThread.threadLocalMap数组的
       // variablesToRemoveIndex下标处放置一个IdentityHashMap,
       //并将该FTL放入IdentityHashMap中,在后续清理时会取出
       //variablesToRemoveIndex下标处的IdentityHashMap进行清理
        private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
       //在threadLocalMap数组中存放实际数据的下标
        private final int index;
        public FastThreadLocal() {
            index = InternalThreadLocalMap.nextVariableIndex();
        }
//初始化 value 函数
protected V initialValue() throws Exception {
    return null;
}

//让使用者在该FTL被移除时可以有机会做些操作。
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
1.5 FastThreadLocalThread

cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了

/**
 * true,表示FTL会在线程结束时被主动清理 见  FastThreadLocalRunnable 类
 * false,需要将FTL放入后台清理线程的队列中
 */
// This will be set to true if we have a chance to wrap the Runnable.
//这个字段则用于标识该线程在结束时是否会主动清理FTL
private final boolean cleanupFastThreadLocals;
//次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 时候创建
private InternalThreadLocalMap threadLocalMap;

public FastThreadLocalThread(Runnable target) {
    super(FastThreadLocalRunnable.wrap(target));
    cleanupFastThreadLocals = true;
}

2 数据 set 和 get

2.1、数据的设置 set 方法
public final void set(V value) {
    //判断设置的 value 值是否是缺省值
    if (value != InternalThreadLocalMap.UNSET) {
        //获取当前线程的 InternalThreadLocalMap , 如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取
        //否则通过 jdk 原生的 threadLocal 获取
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        //FastThreadLocal 对应的 index 下标的 value 替换成新的 value
        setKnownNotUnset(threadLocalMap, value);
    } else {
        //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象
        remove();
    }
}

这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    //在数组下标index处放置实际对象,如果index大于数组length,会进行数组扩容.
    if (threadLocalMap.setIndexedVariable(index, value)) {
        //放置成功之后,将该FTL加入到 variablesToRemoveIndex 下标的
        //IdentityHashMap,等待后续清理
        addToVariablesToRemove(threadLocalMap, this);
    }
}

/**
 * 该FTL加入到variablesToRemoveIndex下标的IdentityHashMap
 * IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置
 */
@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    //获取 variablesToRemoveIndex下标处的 IdentityHashMap
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    //如果是第一次获取,则 variablesToRemoveIndex下标处的值为 UNSET
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        //新建一个新的 IdentityHashMap 并
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
        //放入到下标variablesToRemoveIndex处
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
        variablesToRemove = (Set<FastThreadLocal<?>>) v;
    }
    //将该FTL放入该IdentityHashMap中
    variablesToRemove.add(variable);
}

下面看InternalThreadLocalMap.get()实现:

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    //首先看当前 thread 是否为FastThreadLocalThread实例
    //如果是的话,可以快速通过引用,获取到其 threadLocalMap
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        //如果不是,则 jdk 原生慢速获取到其 threadLocalMap
        return slowGet();
    }
}
2.2、数据的设置 get 方法

了解数据的设置set方法,获取就比较点单,代码就不贴了。

3、 清理 FastThreadLocal 对象

3.1 主动清理

清理 FastThreadLocal 对象相关的代码是在 FastThreadLocalThread 类,和 FastThreadLocalRunnable 类中。
FastThreadLocalThread 的代码见上文
FastThreadLocalRunnable.wrap 方法修饰的 Runnable,表示 FTL 会在线程结束时被主动清理,wrap 方法会把原 Runnable.run 方法放在 try 里,然后在 finally 中调用 FastThreadLocal.removeAll() 方法,该方法会对 FTL 进行清理,具体可看下面列出的源码。

final class FastThreadLocalRunnable implements Runnable {
    private final Runnable runnable;

    private FastThreadLocalRunnable(Runnable runnable) {
        this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
    }

    @Override
    public void run() {
        //run方法放在try里,然后在finally中调用FastThreadLocal.removeAll()方法
        try {
            runnable.run();
        } finally {
            FastThreadLocal.removeAll();
        }
    }
    //被wrap的 Runable会变成F astThreadLocalRunnable对象
    //FastThreadLocalRunnable在run方法的finally会调用
    //FastThreadLocal.removeAll();在线程结束时对FTL
    //进行主动清理
    static Runnable wrap(Runnable runnable) {
        return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
    }
}
3.2 被动清理在4.1.46版本中已经被去掉了

// TODO: We need to find a better way to handle this.
我们需要找到更好的方法来处理这个问题。
但是我目前在源码中没有找到,新的处理办法,或者有知道的不吝赐教,或者这个也作为一个开放性问题吧。

clipboard.png
上一篇下一篇

猜你喜欢

热点阅读