面试技术应用&框架

ThreadLocal类原理简析——线程封闭的常规实现

2018-03-11  本文已影响852人  邱simple

一、简介

并发编程中,当访问共享数据时,通常需要使用同步技术。但如果数据不发布(逸出)到线程以外,仅仅在单线程中被访问和操作,就无需同步。实现这一目的的技术被称作线程封闭(Thread Confinement)。ThreadLocal类正是实现线程封闭性的一种规范途径。好比现在人手一张公交卡,乘坐公交地铁时从口袋里掏出卡刷下就能上下车。公家卡都是保存在每个人自己身上的,而不是统一存在一个寄存柜里。否则每次坐公交都要先去寄存柜拿卡,用完后还要换回去,效率之慢可想而知。

从应用上讲,之前实习的时候给一个Web项目实现了一个“日志追踪”小功能,即在前台向后台发送的每一个Session都会带上一个UUID,该Session向后台发起的所有操作(读写数据库、转发等)都会携带上这样的一个UUID号,并且会输出到日志文件中。其实现原理用的是slf4j的MDC接口,底层使用的就是泛型为Map<String, String>的ThreadLocal类。

下面代码给出一个使用ThreadLocal类的示例。

package com.qyz.thread;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class ThreadLocalDemo {
    // ThreadLocal一般权限声明为private static
    private static final ThreadLocal<Person> aPerson = new ThreadLocal<Person>() {

        // 默认返回null,在此override初始化方法
        @Override
        protected Person initialValue() {
            return new Person("A Baby", 0);
        }
    }; 
    
    Random rand = new Random();
    
    // 辅助演示的javabean
    static class Person {
        String name;
        
        int age;
        
        Person(String name, int age)
        {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {

            return this.name + "-" + this.age;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
        
    }
    
    // 辅助演示的Runnable实现类
    class MyTask implements Runnable {

        private boolean needSet;
        
        MyTask (boolean needSet) {
            this.needSet = needSet;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " Start......");
            // 休眠3s
            try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {}
            
            if (needSet)
                // 向ThreadLocal设置一个值
                aPerson.set(new Person(Thread.currentThread().getName(), rand.nextInt(60)));
                
            System.out.println("Thread Name: " + Thread.currentThread().getName()
                    + "This thread's ThreadLocal: " + aPerson.get().toString());
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ThreadLocalDemo foo = new ThreadLocalDemo();
        Thread t1 = new Thread(foo.new MyTask(true), "Henry");
        Thread t2 = new Thread(foo.new MyTask(true), "Cevin");
        // 该线程不设置ThreadLocal,直接使用初始值
        Thread t3 = new Thread(foo.new MyTask(false), "Jessica");
        t1.start();
        t2.start();
        t3.start();
    }
} /* Output:
Jessica Start......
Henry Start......
Cevin Start......
This thread's ThreadLocal: A Baby-0
This thread's ThreadLocal: Cevin-24
This thread's ThreadLocal: Henry-12
*///:~

根据API文档建议,demo类中声明了一个ThreadLocal<Person>类型的静态成员,虽然只有一个全局对象,但t1,t2,t3线程分别操作了一个单独的Person类实例,在线程内部对该实例的操作都是线程隔离的,不会作用到其他的线程中。就好像新建线程时,给每个线程都封装了一份数据的副本。这也是ThreadLocal对象的目的:用于防止对可变的单实例变量或全局变量进行共享。但请记住,无论多少个线程,ThreadLocal<Person> aPerson对象只有一份。

二、实现原理简析

以上代码展示了ThreadLocal类最基本的使用方法:

  1. 初始化一个ThreadLocal类静态成员,根据需求override其初始化方法initialValue,设置初始值。
  2. 使用set方法设置当前线程的ThreadLocal新值。
  3. 使用get获得当前线程的该ThreadLocal的值。
    除此之外,ThreadLocal类还有remove方法用做清除变量功能,以及一个来自Java1.8的withInitial静态方法。
    ThreadLocal所有的API如下表:
ThreadLocal的API

既然是每个线程都具备一个数据副本,那么使用Thread对象作为key,数据为value,将二者作为键值对存入一个Map集合中,不就可以实现线程封闭的功能了吗?我确实看过有人这么实现过,功能上确实和ThreadLocal如出一辙。但如果深入思考一下,你就会发现此时Map一直持有一个Thread实例的引用,Thread对象的生命周期就受到Map的影响,如果Map不及时清理运行完成的Thread对象,就会造成内存泄露。

当然,硬要使用上述方式实现一个类似ThreadLocal的类也不是不可能,但是既然JDK已经提供了这样一个质优价廉的ThreadLocal类,何不欣然接受,深入学习之呢?

2.1 ThreadLocal的get、set和remove

/**
 * Returns the value in the current thread's copy of this
 * thread-local variable.  If the variable has no value for the
 * current thread, it is first initialized to the value returned
 * by an invocation of the {@link #initialValue} method.
 *
 * @return the current thread's value of this thread-local
 */
public T get() {
    Thread t = Thread.currentThread();
    // 获取当前线程的ThreadLocalMap对象
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        // 传入ThreadLocal对象,获得一个Entry节点
        ThreadLocalMap.Entry e = map.getEntry(this);
        // 非空判断
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 进行初始化并返回初始值
    return setInitialValue();
}

相比于之前所述使用Thread对象作为key值获取线程隔离数据的方式,ThreadLocal反其道而行,以其自身为key查找存储在ThreadLocalMap数据结构中的数据。

    // Thread源码180~182行
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

每个Thread对象都声明了一个ThreadLocalMap对象引用,这个类似于Map数据结构的对象是专门为ThreadLocal类定制的。


/**
 * Variant of set() to establish initialValue. Used instead
 * of set() in case user has overridden the set() method.
 *
 * @return the initial value
 */
private T setInitialValue() {
    // 获取初始值
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 以当前ThreadLocal对象为key,初始值为value存入ThreadLocalMap中
        map.set(this, value);
    else
        // 若当前线程中的ThreadLocalMap为空,则初始化。
        createMap(t, value);
    return value;
}

/**
 * Create the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param t the current thread
 * @param firstValue value for the initial entry of the map
 */
void createMap(Thread t, T firstValue) {
    // 初始化ThreadLocalMap成员
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

ThreadLocal的get方法首先获取当前线程中的ThreadLocalMap成员对象,判空为真时,则调用setInitialValue初始化ThreadLocalMap,这种懒汉初始化的方式提高了内存效率。

以上源码中的注释提及了InheritableThreadLocal,说有些方法在InheritableThreadLocal中被重写了,看看就知道,只是把t.threadLocals改成了t.InheritableThreadLocals。这是个可实现“线程继承”的ThreadLocal类,后面还会介绍。

/**
 * Sets the current thread's copy of this thread-local variable
 * to the specified value.  Most subclasses will have no need to
 * override this method, relying solely on the {@link #initialValue}
 * method to set the values of thread-locals.
 *
 * @param value the value to be stored in the current thread's copy of
 *        this thread-local.
 */
public void set(T value) {
    // 获得当前线程的ThreadLocalMap
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    // 调用ThreadLocalMap的方法进行set。
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

set方法与setInitialValue类似,参考以上代码。

/**
 * Removes the current thread's value for this thread-local
 * variable.  If this thread-local variable is subsequently
 * {@linkplain #get read} by the current thread, its value will be
 * reinitialized by invoking its {@link #initialValue} method,
 * unless its value is {@linkplain #set set} by the current thread
 * in the interim.  This may result in multiple invocations of the
 * {@code initialValue} method in the current thread.
 *
 * @since 1.5
 */
 public void remove() {
     // 删除方法,调用ThreadLocalMap的接口
     ThreadLocalMap m = getMap(Thread.currentThread());
     if (m != null)
         m.remove(this);
 }

remove方法也少的可怜,一个判空后就调用ThreadLocalMap的remove方法了。

ThreadLocal的三个最常用的接口寥寥几行也就写完了,但不难看出其内部实现的真正关键在于ThreadLocalMap类。

三、ThreadLocalMap—线性探测法hash表的最佳实践。

图3.1 ThreadLocal的数据存储结构

3.1 ThreadLocalMap简析

ThreadLocalMap是ThreadLocal的静态内部类,其内部维护着一个Entry数组做散列表来存储数据(后面会根据源码详细讲解)。此Entry非Map.Entry,而是一个WeakReference的子类。WeakReference是Java中弱引用的实现类,一个对象如果仅仅被弱引用指向,则在最近一次gc时就会被回收掉。(有关“强,软,弱,虚”引用的更多知识可以查阅《深入理解Java虚拟机》第65页)

以目前我对ThreadLocal的理解来看,之所以使用弱引用,主要是为了以下两点:

  1. 无强引用指向ThreadLocal实例时,则该实例自觉被垃圾回收器回收。此时也说明任何线程都不会再使用到该ThreadLocal所存储的数据,配合第二点实现ThreadLocalMap自身的垃圾回收机制。
  2. ThreadLocalMap的expungeStaleEntry方法会清除key为null的Entry数据,而导致Key为null的原因要不就是ThreadLocalMap的remove方法调用了WeakReference的clear方法,断开了弱引用(可以想象成断开上图中的虚线),要不就是因为第一点所述的无强引用指向并发生GC。

请注意:根据API文档中对ThreadLocal类建议的用法,一般将其声明称类成员。因此,只要包含ThreadLocal的外部类被被JVM卸载,该ThreadLocal对象将一直被一个强引用指向,因此也不会被GC。ThreadLocalMap的清理机制主要是清理线程隔离的数据,并“腾出”散列表的空位。

由此可见,使用WeakReference是为了配合实现ThreadLocalMap自己的垃圾清理机制,防止内存泄露。

3.2 源码走读

3.2.1 散列表节点——Entry

/**
 * The entries in this hash map extend WeakReference, using
 * its main ref field as the key (which is always a
 * ThreadLocal object).  Note that null keys (i.e. entry.get()
 * == null) mean that the key is no longer referenced, so the
 * entry can be expunged from table.  Such entries are referred to
 * as "stale entries" in the code that follows.
 * 
 * 散列表中的节点类。继承自WeakReference,并扩展了一个Object对象用来
 * 保存线程隔离的数据。若一个entry的指向ThreadLocal实例的引用为空,
 * 说明变成了stale entry,该entry需要被清理。
 */
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

Entry类是内部散列表的节点,继承自WeakReference类,并扩展了一个Object成员保存线程隔离的数据,而弱引用则指向ThreadLocal对象。

3.2.2 线性探测hash表

/**
 * The initial capacity -- MUST be a power of two.
 * 
 * 初始化容量
 */
private static final int INITIAL_CAPACITY = 16;

/**
 * The table, resized as necessary.
 * table.length MUST always be a power of two.
 * 
 * 散列表,表的长度必须为2的幂
 */
private Entry[] table;

/**
 * The number of entries in the table.
 * 
 * 散列表中的Entry个数
 */
private int size = 0;

/**
 * The next size value at which to resize.
 */
private int threshold; // Default to 0

/**
 * Set the resize threshold to maintain at worst a 2/3 load factor.
 * 
 * 扩容阈值为散列表长的三分之二
 */
private void setThreshold(int len) {
    threshold = len * 2 / 3;
}

/**
 * Increment i modulo len.
 * 环形后移
 */
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

/**
 * Decrement i modulo len.
 * 
 * 环形前移
 */
private static int prevIndex(int i, int len) {
    return ((i - 1 >= 0) ? i - 1 : len - 1);
}

源码中提及散列表的长度必须是2的幂。这样设置是为了在散列ThreadLocal对象时能够尽量均匀。这里需要回顾下ThreadLocal的一段有关独有hashcode计算的源码。

public class ThreadLocal<T> {
    /**
     * ThreadLocals rely on per-thread linear-probe hash maps attached
     * to each thread (Thread.threadLocals and
     * inheritableThreadLocals).  The ThreadLocal objects act as keys,
     * searched via threadLocalHashCode.  This is a custom hash code
     * (useful only within ThreadLocalMaps) that eliminates collisions
     * in the common case where consecutively constructed ThreadLocals
     * are used by the same threads, while remaining well-behaved in
     * less common cases.
     * 
     * 以ThreadLocal为key,将线程隔离的数据存储在每个线程私有的线性探测散列表中。
     * 这个threadLocalHashCode仅仅用于Thread.ThreadLocalMaps中
     */
    private final int threadLocalHashCode = nextHashCode();

    /**
     * The next hash code to be given out. Updated atomically. Starts at
     * zero.
     * 
     * 从0开始,保存下一个ThreadLocal对象的hashcode
     */
    private static AtomicInteger nextHashCode =
        new AtomicInteger();

    /**
     * The difference between successively generated hash codes - turns
     * implicit sequential thread-local IDs into near-optimally spread
     * multiplicative hash values for power-of-two-sized tables.
     * 
     * ThreadLocal对象hashcode计算的增量,使用该值能够在表长为2的幂的散列表中
     * 均匀分布节点。
     */
    private static final int HASH_INCREMENT = 0x61c88647;

    /**
     * Returns the next hash code.
     * 
     * 计算下一个对象的hashcode
     */
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

//===============ThreadLocalMap构造方法============================
/**
 * Construct a new map initially containing (firstKey, firstValue).
 * ThreadLocalMaps are constructed lazily, so we only create
 * one when we have at least one entry to put in it.
 * 
 * ThreadLocalMap的构造是懒汉式,直到有一个ThreadLocal被使用,才会初始化。
 */
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    // 初始化各个成员
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}

以上代码说明了ThreadLocal对象在ThreadLocalMap中的散列方式。新建的ThreadLocal对象都以前一个新建ThreadLocal对象的threadLocalHashCode + 0x61c88647作为为自己的hashcode,初始值为0。实践证明,用0x61c88647的累加值与2的整数幂取模,可以得到一个范围内近似均匀分布的序列。由于使用了线性探测法,散列表在逻辑上是一个环形结构(从nextIndex和prevIndex方法也可以看出)。均匀分布的好处在于当发生hash冲突是,很快就能探测到下一个临近的可用位置。而对2的n次幂取模,则可以用& (2^n - 1)代替% (2^n)。因此,才有了散列表的容量必须是2的整数幂的限制。

有关下文中用到的一些名词,在这里做些约定。


图3.2 环形散列表结构图
  1. slot:散列表中的一个位置(节点)。
  2. full entry和full slot:表示full slot是散列表中一个索引位置。该位置上的Entry称为full entry,其所含的弱引用指向不为null。
  3. stale entry和stale slot:stale entry是散列表中stale slot位置的Entry对象,该Entry所含弱引用指向为null。下文中的清理stale entry或者清理stale slot都是意为断开该Entry对象的所有引用,辅助GC,并腾空stale slot位置。
  4. null slot:散列表的null slot位置为null,可用于设置新的Entry。
  5. run:散列表中任意两个null slot之间的一段,不包括两端。

3.2.3 ThreadLocalMap的get和set

/**
 * Get the entry associated with key.  This method
 * itself handles only the fast path: a direct hit of existing
 * key. It otherwise relays to getEntryAfterMiss.  This is
 * designed to maximize performance for direct hits, in part
 * by making this method readily inlinable.
 *  
 * 先用hashcode对表长取模计算索引,此为直接命中。
 * 如果正好命中了,则返回,否则调用getEntryAfterMiss.
 *
 * @param  key the thread local object
 * @return the entry associated with key, or null if no such
 */
private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

/**
 * Version of getEntry method for use when key is not found in
 * its direct hash slot.
 * 
 * 直接命中失败后调用此方法
 * 
 * @param  key the thread local object
 * @param  i the table index for key's hash code
 * @param  e the entry at table[i]
 * @return the entry associated with key, or null if no such
 */
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;
    // 使用线性探测法尝试命中
    while (e != null) {
        ThreadLocal<?> k = e.get();
        // 命中,则返回
        if (k == key)
            return e;
        // 找到个stale entry,进行一次连续段清理
        if (k == null)
            expungeStaleEntry(i);
        else
            // 向后移动
            i = nextIndex(i, len);
        e = tab[i];
    }
    // 走到这一步,说明不存在该ThreadLocal。
    return null;
}

以上代码使用线性探测法尝试在散列表中命中Entry。在getEntryAfterMiss方法中,当判断当前位置上的Entry的弱引用指向为空时,则调用expungeStaleEntry方法进行垃圾清理。expungeStaleEntry是ThreadLocalMap的核心算法。

    private void set(ThreadLocal<?> key, Object value) {
        // We don't use a fast path as with get() because it is at
        // least as common to use set() to create new entries as
        // it is to replace existing ones, in which case, a fast
        // path would fail more often than not.
        // 以上注释大致含义:
        /*
         * 以上注释含义:set方法不像get方法这样使用直接命中方式设置Entry,
         * 因为首次尝试命中在线性探测散列表中经常失败。
         * set尝试使用一种“替换已经存在”策略来设置Entry,而不是直接new一个。
         */
        Entry[] tab = table;
        int len = tab.length;
        // 计算散列表索引
        int i = key.threadLocalHashCode & (len-1);
        // 线性探测查找,不为null slot 则一直向下找。
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();
            if (k == key) {
                // 命中,将当前Entry的value进行设置
                e.value = value;
                return;
            }
            if (k == null) {
                // 如果当前位置Entry的弱引用指向为空,则替换该Entry
                replaceStaleEntry(key, value, i);
                return;
            }
        }
        // 找到个null slot,则以输入参数创建一个新的Entry,并放置进散列表中
        tab[i] = new Entry(key, value);
        int sz = ++size;
        // 尝试一次启发式清理,如果没有清理任何Entry并且个数超出阈值,
        // 则rehash.
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }
    /**
     * Replace a stale entry encountered during a set operation
     * with an entry for the specified key.  The value passed in
     * the value parameter is stored in the entry, whether or not
     * an entry already exists for the specified key.
     *
     * As a side effect, this method expunges all stale entries in the
     * "run" containing the stale entry.  (A run is a sequence of entries
     * between two null slots.)
     *
     * @param  key the key
     * @param  value the value to be associated with key
     * @param  staleSlot index of the first stale entry encountered while
     *         searching for key.
     */
    // 替换策略,进入这个方法,staleSlot位置上肯定会变成full slot
    private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                   int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        Entry e;

        // Back up to check for prior stale entry in current run.
        // We clean out whole runs at a time to avoid continual
        // incremental rehashing due to garbage collector freeing
        // up refs in bunches (i.e., whenever the collector runs).
                    
        // 往前遍历,找到到该slot所在run的首个stale slot位置,设置为清理起点
        int slotToExpunge = staleSlot; // 清理起点
        for (int i = prevIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = prevIndex(i, len))
            if (e.get() == null)
                slotToExpunge = i;

        // Find either the key or trailing null slot of run, whichever
        // occurs first
        // 向后遍历
        for (int i = nextIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();

            // If we find key, then we need to swap it
            // with the stale entry to maintain hash table order.
            // The newly stale slot, or any other stale slot
            // encountered above it, can then be sent to expungeStaleEntry
            // to remove or rehash all of the other entries in run.
            /*
             * 进入到了replaceStaleEntry方法,说明命中了一个stale slot,
             * 则命中的Entry弱引用为空,首次命中的索引为入参staleSlot。
             * 因此,如果在当前段run中命中了key,则将对应的Entry替换到
             * 散列表的staleSlot位置。其实就是将stale Entry往run的后边
             * 移动,方便清理。之后进行一次连续段清理和一次启发式清理。
             */
            if (k == key) {
                e.value = value;

                tab[i] = tab[staleSlot];
                tab[staleSlot] = e;

                // Start expunge at preceding stale entry if it exists
                if (slotToExpunge == staleSlot)
                    slotToExpunge = i;
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                return;
            }

            // If we didn't find stale entry on backward scan, the
            // first stale entry seen while scanning for key is the
            // first still present in the run.
            /*
             * stale entry表示弱引用为空的Entry,如果该位置为stale Entry,
             * 且之前的前向遍历没有找到一个stale Entry,则将当前位置设置为
             * 清理起始位置
             */
            if (k == null && slotToExpunge == staleSlot)
                slotToExpunge = i;
        }

        // If key not found, put new entry in stale slot
                    // 没用找到可以实施替换策略的slot,只能新建一个。
        tab[staleSlot].value = null;
        tab[staleSlot] = new Entry(key, value);

        // If there are any other stale entries in run, expunge them
        /*
         * 执行到此,如果slotToExpunge==staleSlot,说明没有stale entry在run中,
         * 否则,从slotToExpunge执行一次连续段清理和启发式清理。
         */
        if (slotToExpunge != staleSlot)
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }

replaceStaleEntry实现了一种策略:

  1. 将full entry向前移动,将stale slot向后移动,最小化连续段清理方法expungeStaleEntry的清理范围
  2. 以入参staleSlot位置为中心,先向前,再向后遍历找到expungeStaleEntry方法的执行起点位置slotToExpunge。从该位置(包括该位置)往后到遇到一个null slot之前这段,一定有一堆stale slot等待清理。

3.2.4 清理方法

3.2.4.1 连续段清理——expungeStaleEntry

/**
 * Expunge a stale entry by rehashing any possibly colliding entries
 * lying between staleSlot and the next null slot.  This also expunges
 * any other stale entries encountered before the trailing null.  See
 * Knuth, Section 6.4
 *
 * @param staleSlot index of slot known to have null key
 * @return the index of the next null slot after staleSlot
 * (all between staleSlot and this slot will have been checked
 * for expunging).
 * 
 * 在staleSlot和下一个null slot之间的slot都会被检查一遍是否需要清理,
 * 即连续段清理。该方法在清理的同时还会重新“整理”散列表中的Entry,即rehash。
 */
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // expunge entry at staleSlot
    // 手动将value置null,辅助GC
    tab[staleSlot].value = null;
    // 将散列表的staleSlot位置置空,辅助GC
    tab[staleSlot] = null;
    size--;

    // Rehash until we encounter null
    Entry e;
    int i;
    // 从staleSlot向后遍历,直到遇到个null slot
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        // 找到一个stale slot,置null,辅助GC
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            // 根据hashcode计算k在不冲突情况下散列表的索引
            int h = k.threadLocalHashCode & (len - 1);
            /*
             * h!=i说明h位置上发生了hash冲突,导致k的索引值从h向后移动到了i。
             * 此时,查找h位置往后第一个null slot,并将i位置的Entry移动到该位置。
             * 这是一个rehash操作。
             */
            if (h != i) {
                tab[i] = null;

                // Unlike Knuth 6.4 Algorithm R, we must scan until
                // null because multiple entries could have been stale.
                // 查找h位置往后第一个null slot
                while (tab[h] != null)
                    h = nextIndex(h, len);
                // 移动
                tab[h] = e;
            }
        }
    }
    // 返回遇到的第一个null slot索引
    return i;
}

expungeStaleEntry方法边清理,边进行散列表的整理,尽量让full entries连在一起。源码注释中提及了Donald E. Knuth所著的《计算机程序设计艺术》6.4节散列的R算法,该算法是关于如何在一个线性探测hash表中删除一个元素的算法。我天赋太差,一时领悟不了,现截图如下,想广大网友请教。另外还可参考 ThreadLocal源码解读一文。

图3.3 计算机程序设计艺术6.4散列-R算法

3.2.4.2 全表清理expungeStaleEntries

/**
 * Expunge all stale entries in the table.
 * 
 * 全表清理方法
 */
private void expungeStaleEntries() {
    Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {
        Entry e = tab[j];
        // 全表遍历,找到一个stale slot,则做一次连续段清理
        if (e != null && e.get() == null)
            expungeStaleEntry(j);
    }
}

3.2.4.3 启发式清理——cleanSomeSlots

/**
 * Heuristically scan some cells looking for stale entries.
 * This is invoked when either a new element is added, or
 * another stale one has been expunged. It performs a
 * logarithmic number of scans, as a balance between no
 * scanning (fast but retains garbage) and a number of scans
 * proportional to number of elements, that would find all
 * garbage but would cause some insertions to take O(n) time.
 * 
 * 我的理解:
 * 启发式清理stale entris,当插入一个新entry或者调用
 * expungeStaleEntry方法清理entries时会调用此方法。
 * 本方法使用了对数扫描,以权衡"不扫描,留存垃圾"和
 * "全量扫描,工作量线性增长"这两种方式。后一种方式虽然可以找到
 * 所有的垃圾,但是会当时插入动作花费O(n)的时间复杂度。
 * 
 * @param i a position known NOT to hold a stale entry. The
 * scan starts at the element after i.
 * 
 * 散列表在入参i位置上肯定不是stale slot,所以从i后面扫描
 * 
 * @param n scan control: {@code log2(n)} cells are scanned,
 * unless a stale entry is found, in which case
 * {@code log2(table.length)-1} additional cells are scanned.
 * When called from insertions, this parameter is the number
 * of elements, but when from replaceStaleEntry, it is the
 * table length. (Note: all this could be changed to be either
 * more or less aggressive by weighting n instead of just
 * using straight log n. But this version is simple, fast, and
 * seems to work well.)
 *
 * 入参n用来计算本次扫描slot的最小数量——log2(n),插入(set)动作传入size,
 * replaceStaleEntry中的调用传入散列表长度。
 * 
 * 每找到一个stale entries清理后,还会进行额外的log2(表长度)-1数量的slot扫描。
 * 
 * Note:扫描次数根据权重n可以增加或者减少,但该版本的比较简单,快速,使用起来还不错。
 *
 * @return true if any stale entries have been removed.
 * 有slot被清理了就返回true
 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        // 向后遍历
        i = nextIndex(i, len);
        Entry e = tab[i];
        // 找到一个stale slot,执行一次连续段清理
        if (e != null && e.get() == null) {
            /*
             * 重新复制n,影响循环条件,
             * 进行额外的log2(表长度)-1数量的slot扫描
             */
            n = len;
            removed = true;
            // 执行一次连续段清理
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}

3.2.4.4. ThreadLocalMap的remove

/**
 * Remove the entry for key.
 * 
 * 删除,该方法被ThreadLocal的remove接口调用
 */
private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        // 找到入参ThreadLocal对应的Entry,将其变成stale slot
        if (e.get() == key) {
            // WeakReference实例调用clear会断开指向对象的引用
            // 但其他指向该对象的引用仍然有效。
            e.clear();
            // 此时该Entry变成了stale entry,进行一次连续段清理
            expungeStaleEntry(i);
            return;
        }
    }
}

有关WeakReference的clear方法,给出一段测试代码。

    /*
     * 测试WeakReference
     */
    Person p = new Person("Qiu", 24);
    WeakReference<Person> test = new WeakReference<Person>(p);
    System.out.println("test.get() == p: " + (test.get() == p));
    test.clear();
        // 增加删除下行输出结果不变
    // System.gc();
    TimeUnit.SECONDS.sleep(2);
    System.out.println("test.get(): " + test.get());
    System.out.println("p: " + p);
    /* output:
     * test.get() == p: true
     * test.get(): null
     * p: Qiu-24
     */

输出结果说明,该WeakReference对象调用clear方法只是断开了指向Person对象的引用(可以想象成图3.1中虚线断了。),因此get方法返回null,但是该Person一直被p所引用,因此无论是否显式调用GC,都不会导致Person对象被回收。

3.2.5 扩容——rehash与resize

/**
 * Re-pack and/or re-size the table. First scan the entire
 * table removing stale entries. If this doesn't sufficiently
 * shrink the size of the table, double the table size.
 * 
 * 散列表再散列。首先调用一次全表清理方法,此方法边清理,边重新散列各个Entry。
 * 
 * set操作会调用该方法。
 */
private void rehash() {
    expungeStaleEntries();

    // Use lower threshold for doubling to avoid hysteresis
    /*
     * 全表清理之后,如果size没有减小很多(判断阈值为(2/3-2/3/4)len=0.5len),
     * 则进行一次扩容。这样的操作是为了避免延迟。
     */
    if (size >= threshold - threshold / 4)
        resize();
}

/**
 * Double the capacity of the table.
 * 
 * 扩容方法,直接double当前容量,并再散列
 */
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        // 遍历原表中所有的非null entry
        if (e != null) {
            ThreadLocal<?> k = e.get();
            // 找到一个stale entry,则手动清理value
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                // 走到这里,说明是一个full entry,
                // 使用线性探测法方式进新表中
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }
    // 更新变量。
    setThreshold(newLen);
    size = count;
    table = newTab;
}

四、ThreadLocal的内存泄露

我对Java中内存泄露的理解:你永远不会再使用这个对象了,但这个对象却无法被GC。

ThreadLocal有了内存泄露的可能吗?或者具体的说,线程隔离的数据(对象)value会有内存泄露的可能吗?


ps:不敢说没有,这样太嚣张了。

value由一条强引用链抵达:threadLocals(ThreadLocal.ThreadLocalMap对象)->Entry数组的一个slot->entry.value。这要这条链连续,那么value就不会被回收。
在实际开发中,一般使用线程池执行并发任务,不会显示创建销毁Thread对象。而线程池中会维护一定数量的可复用Thread对象(线程池相关参考Java线程池基础知识)。因此,只要这些对象不消亡,上述引用链中的第一节——threadLocals就不会被回收。

至于引用链中的第二节——Entry数组中的一个slot。这个slot中存放着一个Entry,只有将这个slot变成null slot,该位置的Entry才会被回收。ThreadLocalMap的核心清理方法expungeStaleEntry会在getEntry,set,remove方法中机会性调用,而每次调用只能说有很大概率清理掉stale entries。而只有当slot变成stale slot时,才能够被expungeStaleEntry方法清理成null slot。因此,如3.1节开头所述,需要显式地调用ThreadLocal.remove方法,才能断开弱引用,这时slot变成stale slot,并且随后就会进行一次以该slot为起点的expungeStaleEntry清理。

在看看ThreadLocalMap类定义处的一段注释。

  /**
     * ThreadLocalMap is a customized hash map suitable only for
     * maintaining thread local values. No operations are exported
     * outside of the ThreadLocal class. The class is package private to
     * allow declaration of fields in class Thread.  To help deal with
     * very large and long-lived usages, the hash table entries use
     * WeakReferences for keys. However, since reference queues are not
     * used, stale entries are guaranteed to be removed only when
     * the table starts running out of space.
     * 
     * ThreadLocalMap类是一个为ThreadLocal专门定制的实现。使用开放寻址法的散列表存储
     * <ThreadLocal key, Object value>键值对。并自带基于弱引用的高效地垃圾清理机制。
     * stale entries只有在散列表空间快用完时,才会保证一定被清理。
     */
    static class ThreadLocalMap {

stale entries只有在散列表空间快用完时,才会保证一定被清理。散列表空间快用完了会发生什么?发生rehash。ThreadLocalMap的rehash方法中进来就会执行一次全表清理,此时是遍历全表找stale slot,因此才有了“保证”这一动词。注意不是调用resize,因为调用栈从顶向下是resize -> rehash -> set。

综上,为了防止ThreadLocal内存泄露,最好是在使用完毕后,调用remove进行手动清理。否则就靠最后一次set操作会不会触发rehash。

五、可继承的ThreadLocal——InheritableThreadLocal

在ThreadLocal的源码中,可以看到有些注释说:此方法在InheritableThreadLocal中被重写。由此知道ThreadLocal有一个子类InheritableThreadLocal,故名思意,这个子类具备“继承”能力。具体的说,InheritableThreadLocal所关联的数据,能够从双亲线程中继承到子线程中。而该“继承”机制的实现原理可以从Thread类的init方法中一窥究竟。

//==================Thread的init方法代码片段=============================================
// 双亲线程是否有InheritableThreadLocal
if (parent.inheritableThreadLocals != null)
    // 有的话给子线程拷贝一份。
    this.inheritableThreadLocals =
        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

inheritableThreadLocals是Thread的一个类成员,也是ThreadLocalMap类型的。

创建一个线程A的线程B,就是线程A的双亲线程。A线程在创建过程中会调用init方法,中间会判断B线程中的inheritableThreadLocals是否有内容,如果有,则为线程A的inheritableThreadLocals拷贝一份,以实现"继承"。

但是,在实际开发中,一般不会显式创建Thread,而是使用线程池。因此InheritableThreadLocal就没有用武之地了。但在很多场景下,即使在使用线程池的情况下,我们仍然需要一种“ThreadLocal线程传递的功能”以实现任务追踪。这个时候该如何是好呢?

由于公司保密策略,这里就不提了,有兴趣的朋友可以参考transmittable-thread-local项目,实现原理大同小异吧。

另外,InheritableThreadLocal曾经在Logback1.1.5版本之前(不包括)的LogMDCAdapter类中有过实现(1.1.4中的LogMDCAdapter第56行),因此之前版本的MDC(Mapping Diagnosis Context)有“线程继承”的功能,但是文档中也特别说明了,对于线程池中的线程,需要差别对待。然而自1.1.5及之后的版本中(1.1.5中的LogMDCAdapter,51行和55行),Logback取消了MDC的“线程继承”功能,原因是为了解决这个bug——LOGBACK-624 or LOGBACK-422。大致意思就是:“你们这个“线程继承”功能既然在线程池中不稳定,那不如不要,我这给你提供个好方法,也能实现"线程继承"的需求。”于是1.1.5版本开始,Logback确实取消了该功能,并且到现在也没有考虑重新实现。详情还可以参考MDC Adapter with configurable InheritableThreadLocal以及Logback的commit aa7d584ec。之前,踩过坑,望广大同行能够顺利跨过。

六、总结

ThreadLocal的源码包括注释也才722行,但其中却实现了一套定制的线性探测hash表以及高效的垃圾清理机制。看ThreadLocal关键看ThreadLocalMap,有如下特点:

  1. 以ThreadLocal为key,与之关联的线程隔离数据为value,构成Entry放置在散列表中。
  2. Entry类是ThreadLocalMap定制的,继承自WeakReference,以配合ThreadLocalMap自身的垃圾清理方法expungeStaleEntry实现无效Entry清理。
  3. replaceStaleEntry和expungeStaleEntry都有rehash功能,尽量让有效的entries连续分布,无效的entries连续分布。
  4. get、set和remove方法都能够机会性(remove一定)调用expungeStaleEntry进行清理。该方法边清理,边整理,但是并不保证能够清理完所有的无效Entries,只有扩容的时候残能够全表清理。
  5. 为防止内存泄露,并发任务结束后请手动调用ThreadLocal.remove。

此外,文章末尾也提及了有关InheritableThreadLocal的原理及应用,虽然这个类有些鸡肋,但Inheritable的功能聊胜于无。

ps: 将源码连同注释一起贴在文章中是因为自己英语不是很好怕对类的工作原理的理解有些误差每段源码注释中必要的地方我写上了自己的理解如果有误,希望同行指出(其实我就是为了凑字数呵呵)。

七、参考文献

  1. 《Java并发编程实战》
  2. ThreadLocal源码解读
  3. 对ThreadLocal实现原理的一点思考
  4. 《深入理解Java虚拟机(第2版) : JVM高级特性与最佳实践》
上一篇下一篇

猜你喜欢

热点阅读