exam

彻底理解ThreadLocal

2018-03-13  本文已影响0人  炮灰向前冲啦

ThreadLocal的作用是提供一个线程的局部变量,比如context、session。是直接把某个对象在各自线程中实例化一份,每个线程都有属于自己的该对象。ThreadLocal实例通常来说都是private static类型

ThreadLocal首先不是解决线程安全问题。ThreadLocal内部保存的局部变量在多线程之间不能共享,也就是必须new出来。如果ThreadLocal内部保存的变量本身就是一个多线程共享的对象,那么还是会有线程安全的问题,此时用同步锁Synchronized来处理

Synchronized处理线程间的数据共享,ThreadLocal处理线程间的数据独占

使用姿势

public class DateUtils {
    private static ThreadLocal<SimpleDateFormat> local = new ThreadLocal<SimpleDateFormat>() {
        @Override
        protected SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        }
    };

    public static String format(Date date) {
        return local.get().format(date);
    }
}

SimpleDateFormat是线程不安全的,每次使用时必须new。用ThreadLocal来实现线程缓存,每个线程都有一个独享对象,避免了频繁创建对象,也避免了多线程的竞争

FastDateFormat是apache commons包的实现

private final ConcurrentMap<MultipartKey, F> cInstanceCache
        = new ConcurrentHashMap<>(7);
        
public F getInstance(final String pattern, TimeZone timeZone, Locale locale) {
    Validate.notNull(pattern, "pattern must not be null");
    if (timeZone == null) {
        timeZone = TimeZone.getDefault();
    }
    if (locale == null) {
        locale = Locale.getDefault();
    }
    final MultipartKey key = new MultipartKey(pattern, timeZone, locale);
    F format = cInstanceCache.get(key);
    if (format == null) {
        format = createInstance(pattern, timeZone, locale);
        final F previousValue= cInstanceCache.putIfAbsent(key, format);
        if (previousValue != null) {
            // another thread snuck in and did the same work
            // we should return the instance that is in ConcurrentMap
            format= previousValue;
        }
    }
    return format;
}

FastDateFormat通过ConcurrentHashMap,避免所有相关操作线程都存放SimpleDateFormat对象,内存浪费。Map的key是MultipartKey对象(必须实现equals、hashcode方法),value是SimpleDateFormat

序列化框架Kryo对象也是线程不安全的。通常使用ThreadLocal或者KryoPool实现操作安全

get、set、remove实现原理

先看ThreadLocal.get()方法实现

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

// 初始化Entry[] table数组,长度16
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    
    // 设置resize值: 16*2/3
    setThreshold(INITIAL_CAPACITY);
}

每个Thread线程都有一个ThreadLocalMap数据结构,该Map的key是用户new的ThreadLocal对象,value是需要线程存储的object值

在ThreadLocalMap中,初始化一个大小16的Entry数组,Entry对象用来保存每一个key-value键值对,只不过这里的key永远都是ThreadLocal对象。通过ThreadLocal对象的set方法,结果把ThreadLocal对象自己当做key,放进了ThreadLocalMap中

ThreadLocal-Structure

为什么key不是Thread ID,或者所有Thread共享一个全局的ConcurrentHashMap?

既然每个Thread都有自己的局部变量存储空间,那Map肯定是属于Thread类的成员变量,全局Map反而会引入并发控制。至于Map的key,若是Thread id,那一个Thread就只能存储一个value值。试想,从Controller->Service->Dao,每个类都有ThreadLocal成员变量的话,同一个线程经历不同阶段,当然是以类的ThreadLocal对象为key,才能同一个线程,经历不同类时,获取到当前类设置的线程存储值

分析ThreadLocalMap.Entry e = map.getEntry(this);

// Entry类继承了WeakReference<ThreadLocal<?>>,即每个Entry对象都有一个ThreadLocal的弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

key.threadLocalHashCode & (table.length - 1),hash取模的实现。hashcode & (size-1) 比 hashcode % size实现更高效。

private final int threadLocalHashCode = nextHashCode();

private static AtomicInteger nextHashCode = new AtomicInteger();

private static final int HASH_INCREMENT = 0x61c88647;

private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

每个ThreadLocal对象都有一个hash值threadLocalHashCode,每初始化一个ThreadLocal对象,hash值就增加一个固定大小0x61c88647。nextHashCode是静态的AtomicInteger,所有ThreadLocal对象共享getAndAdd(HASH_INCREMENT)

魔数0x61c88647,斐波那契散列

public class ThreadLocalDemo {

    private static final int size = 16;
    private static final int threshold = size - 1;

    private static final AtomicInteger atomic = new AtomicInteger();
    private static final int HASH_INCREMENT = 0x61c88647;

    public static void main(String[] args) {
        List<Integer> hashCode = Lists.newArrayList();
        List<Integer> fiboHash = Lists.newArrayList();
        List<Integer> murmurHash = Lists.newArrayList();
        List<Integer> consistentHash = Lists.newArrayList();

        for (int i = 0; i < size; i++) {
            Object a = new Object();
            hashCode.add(a.hashCode() & threshold);
            fiboHash.add(atomic.getAndAdd(HASH_INCREMENT) & threshold);
            murmurHash.add(Hashing.murmur3_32().hashInt(i).asInt() & threshold);
            consistentHash.add(Hashing.consistentHash(i, size) & threshold);
        }

        System.out.println(StringUtils.join(hashCode, ", "));
        System.out.println(StringUtils.join(fiboHash, ", "));
        System.out.println(StringUtils.join(murmurHash, ", "));
        System.out.println(StringUtils.join(consistentHash, ", "));
    }
}

结果为

11, 5, 6, 3, 13, 4, 12, 10, 5, 7, 0, 2, 13, 4, 6, 7
0, 7, 14, 5, 12, 3, 10, 1, 8, 15, 6, 13, 4, 11, 2, 9
14, 10, 15, 1, 7, 14, 14, 1, 1, 3, 13, 10, 2, 1, 11, 6
0, 6, 15, 8, 12, 10, 9, 13, 4, 14, 14, 12, 10, 0, 13, 14

可以看出斐波散列分布很均匀,没有冲突。其他hashcode、murmurHash、consistentHash都会有或多或少的冲突。不过斐波散列需要AtomicInteger共享变量

那是否可以直接用AtomicInteger递增取模,而不用递增0x61c88647以及Hash取模?

主要原因:当插入新的Entry且出现Entry冲突,而进行线性探测时,后续的Entry坑也极大可能被占了(因为之前是连续存储),使得线性探测性能差。而斐波散列的nextIndex()很大可能是有坑且可以插入的。Netty的FastThreadLocal是AtomicInteger递增的

进入getEntryAfterMiss(key, i, e)方法

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    // 当hash取模的散列值所对应的entry有值但不是当前的ThreadLocal对象时,将对象顺延存放到下一个index,不断检测,直到entry有坑可以存放
    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        if (k == null)
            // 当Entry的key,也就是ThreadLocal对象为空,清理value值
            // value = null; entry = null;
            expungeStaleEntry(i);
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

// 开放定址法-线性探测
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

Hash散列冲突时,可以分离链表法开放定址法

分离链表法:使用链表解决冲突,将散列值相同的元素都保存到一个链表中。当查询的时候,首先找到元素所在的链表,然后遍历链表查找对应的元素

modulo

开放定址法:当散列到的数组slot被占用时,就会尝试在数组中寻找其他空slot。探测数组空slot的方式有很多,这里介绍一种最简单的 –- 线性探测法。线性探测法就是从冲突的数组slot开始,依次往后探测空slot,如果到数组尾部,再从头开始探测(环形查找)

nextIndex

分析ThreadLocal.set()方法实现

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

private void set(ThreadLocal<?> key, Object value) {
    // We don't use a fast path as with get() because it is at
    // least as common to use set() to create new entries as
    // it is to replace existing ones, in which case, a fast
    // path would fail more often than not.
    Entry[] tab = table;
    int len = tab.length;
    // 根据ThreadLocal的散列值,查找对应元素在数组中的位置
    int i = key.threadLocalHashCode & (len-1);

    // 使用线性探测法查找元素
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        
        // ThreadLocal存在时,直接覆盖之前的值
        if (k == key) {
            e.value = value;
            return;
        }
        
        // key为null,但是值不为null,说明之前的ThreadLocal对象已经被回收了,当前数组中的Entry是一个陈旧(stale)的元素
        if (k == null) {
            // 用新元素代替陈旧的元素,并cleanSomeSlots()
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    
    // 如果没有可清理的陈旧Entry并且数组中的元素大于阈值,则进行rehash
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

分析ThreadLocal.remove()方法实现

 public void remove() {
     ThreadLocalMap m = getMap(Thread.currentThread());
     if (m != null)
         m.remove(this);
 }
 
 private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            // 引用设置为null
            e.clear();
            // 清理陈旧的 Entry
            expungeStaleEntry(i);
            return;
        }
    }
}

内存泄漏问题

Entry本身是一个只接受ThreadLocal对象的弱引用类,也就是说Entry存储的key是弱引用,但是它所存储的value是强引用。弱引用不用担心,当ThreadLocal在没有外部强引用时,下次GC就会被回收,那么持有强引用的value会不会引起内存泄漏?

ThreadLocalMap本身对帮助垃圾回收做了很多处理,每次调用get、set方法的时候都会对无效元素进行清理,表内空间不足的时候更是会进行一次彻底的rehash。其中调用expungeStaleEntry()方法,都会在key已经失效时,对Entry的value以及Entry本身置null,释放这些强引用

但是有些情况仍然会造成内存泄漏,那就是使用线程池。因为线程会永远存在线程池中,线程无法结束后回收,线程中的value引用会一直得不到释放,导致内存泄漏

所以最稳妥的方法,还是在使用完ThreadLocal后及时调用remove方法。该方法会查找以当前ThreadLocal对象为key的Entry,及时清理这个元素的key、value和Entry本身

父子线程值传递--InheritableThreadLocal

private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();

public static void main(String[] args) throws InterruptedException {
    threadLocal.set("hello world");
    System.out.println("main: " + threadLocal.get());

    Thread thread = new Thread(() -> {
        System.out.println("thread: " + threadLocal.get());
        threadLocal.set("hi");
        System.out.println("thread: " + threadLocal.get());
    });

    thread.start();
    thread.join();

    System.out.println("main: " + threadLocal.get());
}

打印结果:

main: hello world
thread: null
thread: hi
main: hello world

因为ThreadLocal的值跟Thread绑定,子线程get时获取值为null,父子线程之间set的值互不影响

private static final ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();

修改ThreadLocal为InheritableThreadLocal后

main: hello world
thread: hello world
thread: hi
main: hello world

子线程获取到父线程set的值,而子线程自定义的值不会影响父线程。子继承父线程值,而不会影响父线程值

InheritableThreadLocal实现:

其实Thread类有两个ThreadLocalMap变量。一个用于保存ThreadLocal对象和其value值;一个用于保存InheritableThreadLocal对象和其value值

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

/*
 * InheritableThreadLocal values pertaining to this thread. This map is
 * maintained by the InheritableThreadLocal class.
 */
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

当new Thread()时,调用Thread的init方法。ThreadLocal.createInheritedMap()创建一个新的ThreadLocalMap,并copy父Thread的Map数据

Thread parent = currentThread();

if (inheritThreadLocals && parent.inheritableThreadLocals != null)
    this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

注意:当使用线程池时,多个任务之间会因为线程共用,而导致ThreadLocal对象get、set操作相互污染

可以参考阿里的实现方案: TransmittableThreadLocal

扩展ThreadLocal

Netty的FastThreadLocal<V>

private final int index;
public FastThreadLocal() {
    index = InternalThreadLocalMap.nextVariableIndex();
}
public static int nextVariableIndex() {
    int index = nextIndex.getAndIncrement();
    return index;
}
static final AtomicInteger nextIndex = new AtomicInteger();

public static final Object UNSET = new Object();
Object[] array = new Object[32];
Arrays.fill(array, UNSET);

每个FastThreadLocal对象都有一个index,该index是全局自增的AtomicInteger.getAndIncrement()。UnpaddedInternalThreadLocalMap维护一个初始32长度的Object[]数组,数组存放value值

set()时,根据FastThreadLocal的index,将value插入到Object[index]下,因为index是全局自增的,不会出现slot槽有值的情况;get()时,根据index,直接数组Object[index]读取value即可;remove()时,设置Object[index] = new Object();

FastThreadLocal相对于ThreadLocal,不需要hash,不需要线性探测(O(1)->O(n)),不需要在数组中存放ThreadLocal对象本身。缺点是有多少FastThreadLocal对象,就得至少多长数组,也无法利用回收后的数组槽(nextIndex自增导致)

Spring的NamedThreadLocal<T>

public class NamedThreadLocal<T> extends ThreadLocal<T> {
    private final String name;

    /**
     * Create a new NamedThreadLocal with the given name.
     * @param name a descriptive name for this ThreadLocal
     */
    public NamedThreadLocal(String name) {
        Assert.hasText(name, "Name must not be empty");
        this.name = name;
    }

    @Override
    public String toString() {
        return this.name;
    }
}

引申

ThreadLocal源码详细解析

Java弱引用

理解Java-Reference

TransmittableThreadLocal

FastThreadLocal测试

上一篇下一篇

猜你喜欢

热点阅读