ThreadLocal源码深入分析

2018-02-04  本文已影响85人  wewarriors

ThreadLocal源码深入分析

ThreadLocal :线程本地存储区(Thread Local Storage,简称为TLS),每个线程都有自己的私有的本地存储区域,不同线程之间彼此不能访问对方的TLS区域。它是一个线程内部的数据类,通过它可以在指定线程中存储数据,数据存储以后,只有在指定线程中可以获取到存储的数据,对于其他线程来说则无法获取到数据。一般来说,当某些数据是以线程为作用域并且不同的线程具有不同的数据副本的时候,就可以采用ThreadLocal。比如对于Handler来说,他需要获取当前线程的Looper,很显然Looper的作用域就是线程,并且不同线程具有不同的Looper,这个时候通过ThreadLocal就可以轻松实现Looper在线程中的获取。下面通过实际的例子来演示ThreadLocal的真正含义。

1. 实例

public class TestThreadLocal {
    private static final ThreadLocal mThreadLocal = new ThreadLocal();

    public static void main(String[] args) {
        mThreadLocal.set(true);
        System.out.println("ThreadMain mThreadLocal = "+mThreadLocal.get());
        new Thread("Thread1"){
            @Override
            public void run() {
                mThreadLocal.set(false);
                System.out.println("Thread1 mThreadLocal = "+mThreadLocal.get());
            }
        }.start();

        new Thread("Thread2"){
            @Override
            public void run() {
                System.out.println("Thread2 mThreadLocal = "+mThreadLocal.get());
            }
        }.start();


    }
}

在上面的代码中,在主线程中设置 mThreadLocal 的值为 true,在子线程1中设置 mThreadLocal 的值为 false,然后在子线程2中不设置 mThreadLocal 的值。然后分别在3个子线程中通过 get 方法获取 mThreadLocal 的值,根据前面对 ThreadLocal 的描述,这个时候,主线程应该是 true,子线程1中应该是 false,而子线程 2 中由于没有设置值,所以应该是 null。

运行结果为:

ThreadMain mThreadLocal = true
Thread1 mThreadLocal = false
Thread2 mThreadLocal = null

从结果可以看出, 虽然在不同线程中访问的是同一个 ThreadLocal 对象,但是它们通过 ThreadLocal 获取到的值却不同。下面从源码的角度来分析它的工作原理。

2. 类图

假如让我们来实现一个变量与线程相绑定的功能,我们可以很容易地想到用HashMap来实现,Thread作为key,变量作为value。事实上,JDK中确实使用了类似Map的结构存储变量,但不是像我们想的那样。下面我们来探究OpenJDK 1.8中ThreadLocal的实现。

diagram.png

3. ThreadLocal的成员变量

我们从 ThreadLocal 的几个成员变量入手:

/**
 * ThreadLocals rely on per-thread linear-probe hash maps attached
 * to each thread (Thread.threadLocals and
 * inheritableThreadLocals).  The ThreadLocal objects act as keys,
 * searched via threadLocalHashCode.  This is a custom hash code
 * (useful only within ThreadLocalMaps) that eliminates collisions
 * in the common case where consecutively constructed ThreadLocals
 * are used by the same threads, while remaining well-behaved in
 * less common cases.
 */
private final int threadLocalHashCode = nextHashCode();

/**
 * The next hash code to be given out. Updated atomically. Starts at
 * zero.
 */
private static AtomicInteger nextHashCode =
    new AtomicInteger();

/**
 * The difference between successively generated hash codes - turns
 * implicit sequential thread-local IDs into near-optimally spread
 * multiplicative hash values for power-of-two-sized tables.
 */
private static final int HASH_INCREMENT = 0x61c88647;

/**
 * Returns the next hash code.
 */
private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

ThreadLocal通过threadLocalHashCode 来标识每一个ThreadLocal 的唯一性。threadLocalHashCode 通过 CAS操作 进行更新,每次 hash 操作的增量为 0x61c88647

4. ThreadLocal的重要方法

4.1 ThreadLocal#set方法

/**
 * Sets the current thread's copy of this thread-local variable
 * to the specified value.  Most subclasses will have no need to
 * override this method, relying solely on the {@link #initialValue}
 * method to set the values of thread-locals.
 *
 * @param value the value to be stored in the current thread's copy of
 *        this thread-local.
 */
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

可以看到通过Thread.currentThread()方法获取了当前的线程引用,并传给了getMap(Thread)方法获取一个ThreadLocalMap的实例。我们继续跟进getMap(Thread)方法:

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

可以看到getMap(Thread)方法直接返回Thread实例的成员变量threadLocals。它的定义在Thread内部,访问级别为package级别:

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

到了这里,我们可以看出,每个Thread里面都有一个ThreadLocal.ThreadLocalMap成员变量,也就是说每个线程通过ThreadLocal.ThreadLocalMap与ThreadLocal相绑定,这样可以确保每个线程访问到的thread-local variable都是本线程的。

我们往下继续分析。获取了ThreadLocalMap实例以后,如果它不为空则调用ThreadLocalMap.ThreadLocalMap#set方法设值;若为空则调用ThreadLocal#createMap方法new一个ThreadLocalMap实例并赋给Thread.threadLocals

ThreadLocal#createMap方法的源码如下:

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

4.2 ThreadLocal#get方法

ThreadLocal的get方法就是调用了ThreadLocalMap的getEntry方法:

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

可以看到 ThreadLocal 的 getset 方法调用了 ThreadLocalMap 中的 setgetEntry 方法。下面我们探究一下ThreadLocalMap的实现

5. ThreadLocalMap

5.1 ThreadLocal的成员变量

在上面的类图中,我们可以看到 ThreadLocalMap 有一个常量和三个成员变量:

/**
 * The initial capacity -- MUST be a power of two.
 */
private static final int INITIAL_CAPACITY = 16;

/**
 * The table, resized as necessary.
 * table.length MUST always be a power of two.
 */
private Entry[] table;

/**
 * The number of entries in the table.
 */
private int size = 0;

/**
 * The next size value at which to resize.
 */
private int threshold; // Default to 0

其中INITIAL_CAPACITY代表这个Map的初始容量;1是一个Entry类型的数组,用于存储数据;size代表表中的存储数目;threshold代表需要扩容时对应size的阈值。

Entry类是ThreadLocalMap的静态内部类,用于存储数据。它的源码如下:

/**
 * The entries in this hash map extend WeakReference, using
 * its main ref field as the key (which is always a
 * ThreadLocal object).  Note that null keys (i.e. entry.get()
 * == null) mean that the key is no longer referenced, so the
 * entry can be expunged from table.  Such entries are referred to
 * as "stale entries" in the code that follows.
 */
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

Entry类继承了WeakReference<ThreadLocal<?>>,即每个Entry对象都有一个ThreadLocal的弱引用(作为key),这是为了防止内存泄露。一旦线程结束,key变为一个不可达的对象,这个Entry就可以被GC了。

5.2 ThreadLocalMap的构造函数

ThreadLocalMap类有两个构造函数,其中常用的是ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue)

/**
 * Construct a new map initially containing (firstKey, firstValue).
 * ThreadLocalMaps are constructed lazily, so we only create
 * one when we have at least one entry to put in it.
 */
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}

构造函数的第一个参数就是本ThreadLocal实例(this),第二个参数就是要保存的线程本地变量。构造函数首先创建一个长度为16的Entry数组,然后计算出firstKey对应的哈希值,然后存储到table中,并设置size和threshold。

注意一个细节,计算hash的时候里面采用了hashCode & (size - 1)的算法,这相当于取模运算hashCode % size的一个更高效的实现(和HashMap中的思路相同)。正是因为这种算法,我们要求size必须是2的指数,因为这可以使得hash发生冲突的次数减小。

5.3 ThreadLocalMap#set方法

接下来我们来看ThreadLocalMap#set方法的实现:

/**
 * Set the value associated with key.
 *
 * @param key the thread local object
 * @param value the value to be set
 */
private void set(ThreadLocal<?> key, Object value) {

    // We don't use a fast path as with get() because it is at
    // least as common to use set() to create new entries as
    // it is to replace existing ones, in which case, a fast
    // path would fail more often than not.

    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    //线性探测
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        //找到对应的key
        if (k == key) {
            e.value = value;
            return;
        }
        // 替换失效的key
        if (k == null) {
            //如果entry里对应的key为null的话,表明此entry为staled entry (staled 旧的),就将其替换为             //当前的key和value。详情可见replaceStaleEntry 方法。
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    //若是经历了上面步骤没有命中hash,也没有发现无用的Entry,set方法就会创建一个新的Entry,并会进行启发     //式的垃圾清理,用于清理无用的Entry。如果size大于阈值,还会进行rehash()算法。
    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

如果冲突了,就会通过nextIndex方法再次计算哈希值:

/**
 * Increment i modulo len.
 */
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

到这里,我们看到ThreadLocalMap解决冲突的方法是线性探测法(不断加1),而不是HashMap的链地址法,这一点也能从Entry的结构上推断出来。

5.4 ThreadLocalMap#replaceStaleEntry方法

/**
 * Replace a stale entry encountered during a set operation
 * with an entry for the specified key.  The value passed in
 * the value parameter is stored in the entry, whether or not
 * an entry already exists for the specified key.
 *
 * As a side effect, this method expunges all stale entries in the
 * "run" containing the stale entry.  (A run is a sequence of entries
 * between two null slots.)
 *
 * @param  key the key
 * @param  value the value to be associated with key
 * @param  staleSlot index of the first stale entry encountered while
 *         searching for key.
 */
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // Back up to check for prior stale entry in current run.
    // We clean out whole runs at a time to avoid continual
    // incremental rehashing due to garbage collector freeing
    // up refs in bunches (i.e., whenever the collector runs).
    //向前扫描,查找最前的一个无效slot
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;

    // Find either the key or trailing null slot of run, whichever
    // occurs first
    //向后遍历table
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();

        // If we find key, then we need to swap it
        // with the stale entry to maintain hash table order.
        // The newly stale slot, or any other stale slot
        // encountered above it, can then be sent to expungeStaleEntry
        // to remove or rehash all of the other entries in run.
        //// 找到了key,将其与无效的slot交换
        if (k == key) {
            e.value = value;

            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            // Start expunge at preceding stale entry if it exists
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }

        // If we didn't find stale entry on backward scan, the
        // first stale entry seen while scanning for key is the
        // first still present in the run.
        //// 如果当前的slot已经无效,并且向前扫描过程中没有无效slot,则更新slotToExpunge为当前位置
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }

    // If key not found, put new entry in stale slot
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // If there are any other stale entries in run, expunge them
    // 在探测过程中如果发现任何无效slot,则做一次清理(连续段清理+启发式清理)
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

替换过程里面也进行了不少的垃圾清理动作以防止引用关系存在而导致的内存泄露。主要是cleanSomeSlots

5.5 ThreadLocalMap的清理算法#cleanSomeSlots和#expungeStaleEntry算法

/**
 * 启发式地清理slot,
 * i对应entry是非无效(指向的ThreadLocal没被回收,或者entry本身为空)
 * n是用于控制控制扫描次数的
 * 正常情况下如果log n次扫描没有发现无效slot,函数就结束了
 * 但是如果发现了无效的slot,将n置为table的长度len,做一次连续段的清理
 * 再从下一个空的slot开始继续扫描
 * 
 * 这个函数有两处地方会被调用,一处是插入的时候可能会被调用,另外个是在替换无效slot的时候可能会被调用,
 * 区别是前者传入的n为元素个数,后者为table的容量
 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        // i在任何情况下自己都不会是一个无效slot,所以从下一个开始判断
        i = nextIndex(i, len);
        Entry e = tab[i];
        if (e != null && e.get() == null) {
            // 扩大扫描控制因子
            n = len;
            removed = true;
            //  清理一个连续段
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}

/**
 * 这个函数是ThreadLocal中核心清理函数,它做的事情很简单:
 * 就是从staleSlot开始遍历,将无效(弱引用指向对象被回收)清理,即对应entry中的value置为null,将指向这   entry的table[i]置为null,直到扫到空entry。
 * 另外,在过程中还会对非空的entry作rehash。
 * 可以说这个函数的作用就是从staleSlot开始清理连续段中的slot(断开强引用,rehash slot等)
 */
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 因为entry对应的ThreadLocal已经被回收,value设为null,显式断开强引用
    tab[staleSlot].value = null;
    // 显式设置该entry为null,以便垃圾回收
    tab[staleSlot] = null;
    size--;

    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        // 清理对应ThreadLocal已经被回收的entry
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            /*
             * 对于还没有被回收的情况,需要做一次rehash。
             * 
             * 如果对应的ThreadLocal的ID对len取模出来的索引h不为当前位置i,
             * 则从h向后线性探测到第一个空的slot,把当前的entry给挪过去。
             */
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) {
                tab[i] = null;
                
                /*
                 * 在原代码的这里有句注释值得一提,原注释如下:
                 *
                 * Unlike Knuth 6.4 Algorithm R, we must scan until
                 * null because multiple entries could have been stale.
                 *
                 * 这段话提及了Knuth高德纳的著作TAOCP(《计算机程序设计艺术》)的6.4章节(散列)
                 * 中的R算法。R算法描述了如何从使用线性探测的散列表中删除一个元素。
                 * R算法维护了一个上次删除元素的index,当在非空连续段中扫到某个entry的哈希值取模后的索                  引
                 * 还没有遍历到时,会将该entry挪到index那个位置,并更新当前位置为新的index,
                 * 继续向后扫描直到遇到空的entry。
                 *
                 * ThreadLocalMap因为使用了弱引用,所以其实每个slot的状态有三种也即
                 * 有效(value未回收),无效(value已回收),空(entry==null)。
                 * 正是因为ThreadLocalMap的entry有三种状态,所以不能完全套高德纳原书的R算法。
                 *
                 * 因为expungeStaleEntry函数在扫描过程中还会对无效slot清理将之转为空slot,
                 * 如果直接套用R算法,可能会出现具有相同哈希值的entry之间断开(中间有空entry)。
                 */
                while (tab[h] != null) {
                    h = nextIndex(h, len);
                }
                tab[h] = e;
            }
        }
    }
    // 返回staleSlot之后第一个空的slot索引
    return i;
}

5.6 ThreadLocalMap#rehash()方法

/**
 * Re-pack and/or re-size the table. First scan the entire
 * table removing stale entries. If this doesn't sufficiently
 * shrink the size of the table, double the table size.
 */
private void rehash() {
    // 做一次全量清理
    expungeStaleEntries();

    // Use lower threshold for doubling to avoid hysteresis
    /*
     * 因为做了一次清理,所以size很可能会变小。
     * ThreadLocalMap这里的实现是调低阈值来判断是否需要扩容,
     * threshold默认为len*2/3,所以这里的threshold - threshold / 4相当于len/2
     */
    if (size >= threshold - threshold / 4)
        resize();
}

/**
 * Double the capacity of the table.
 */
/**
 * 扩容,因为需要保证table的容量len为2的幂,所以扩容即扩大2倍
 */
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }

    setThreshold(newLen);
    size = count;
    table = newTab;
}

/**
 * Expunge all stale entries in the table.
 */
 /*
 * 做一次全量清理
 */
private void expungeStaleEntries() {
    Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {
        Entry e = tab[j];
        if (e != null && e.get() == null)
            expungeStaleEntry(j);
    }
}

我们来回顾一下 ThreadLocalset 方法可能会有的情况

5.7 ThreadLocalMap#getEntry方法

/**
 * Get the entry associated with key.  This method
 * itself handles only the fast path: a direct hit of existing
 * key. It otherwise relays to getEntryAfterMiss.  This is
 * designed to maximize performance for direct hits, in part
 * by making this method readily inlinable.
 *
 * @param  key the thread local object
 * @return the entry associated with key, or null if no such
 */
private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

/**
 * Version of getEntry method for use when key is not found in
 * its direct hash slot.
 *
 * @param  key the thread local object
 * @param  i the table index for key's hash code
 * @param  e the entry at table[i]
 * @return the entry associated with key, or null if no such
 */
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        if (k == null)
            expungeStaleEntry(i);
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

从ThreadLocal读一个值可能遇到的情况:
根据入参threadLocal的threadLocalHashCode对表容量取模得到index

5.8 ThreadLocalMap#remove方法

/**
 * 从map中删除ThreadLocal
 */
private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len - 1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            // 显式断开弱引用
            e.clear();
            // 进行段清理
            expungeStaleEntry(i);
            return;
        }
    }
}

remove方法相对于getEntry和set方法比较简单,直接在table中找key,如果找到了,把弱引用断了做一次段清理。

6. ThreadLocal与内存泄漏

关于ThreadLocal是否会引起内存泄漏也是一个比较有争议性的问题,其实就是要看对内存泄漏的准确定义是什么。
认为ThreadLocal会引起内存泄漏的说法是因为如果一个ThreadLocal对象被回收了,我们往里面放的value对于【当前线程->当前线程的threadLocals(ThreadLocal.ThreadLocalMap对象)->Entry数组->某个entry.value】这样一条强引用链是可达的,因此value不会被回收。
认为ThreadLocal不会引起内存泄漏的说法是因为ThreadLocal.ThreadLocalMap源码实现中自带一套自我清理的机制。

之所以有关于内存泄露的讨论是因为在有线程复用如线程池的场景中,一个线程的寿命很长,大对象长期不被回收影响系统运行效率与安全。如果线程不会复用,用完即销毁了也不会有ThreadLocal引发内存泄露的问题。《Effective Java》一书中的第6条对这种内存泄露称为unintentional object retention(无意识的对象保留)。

当我们仔细读过ThreadLocalMap的源码,我们可以推断,如果在使用的ThreadLocal的过程中,显式地进行remove是个很好的编码习惯,这样是不会引起内存泄漏。
那么如果没有显式地进行remove呢?只能说如果对应线程之后调用ThreadLocal的get和set方法都有很高的概率会顺便清理掉无效对象,断开value强引用,从而大对象被收集器回收。

但无论如何,我们应该考虑到何时调用ThreadLocal的remove方法。一个比较熟悉的场景就是对于一个请求一个线程的server如tomcat,在代码中对web api作一个切面,存放一些如用户名等用户信息,在连接点方法结束后,再显式调用remove。

7. 总结

每个Thread里都含有一个ThreadLocalMap的成员变量,这种机制将ThreadLocal和线程巧妙地绑定在了一起,即可以保证无用的ThreadLocal被及时回收,不会造成内存泄露,又可以提升性能。假如我们把ThreadLocalMap做成一个Map<t extends Thread, ?>类型的Map,那么它存储的东西将会非常多(相当于一张全局线程本地变量表),这样的情况下用线性探测法解决哈希冲突的问题效率会非常差。而JDK里的这种利用ThreadLocal作为key,再将ThreadLocalMap与线程相绑定的实现,完美地解决了这个问题。

本文参考了以下两篇文章。特别是文章1,里面对ThreadLocal中的清理算法做了详细的注释。感谢两篇文章作者的分享。

[1]http://www.cnblogs.com/micrari/p/6790229.html

[2]http://www.sczyh30.com/posts/Java/java-concurrent-threadlocal/

上一篇下一篇

猜你喜欢

热点阅读