TransmittableThreadLocal源码分析

2020-09-07  本文已影响0人  捞月亮的阿汤哥

1. 类继承关系

类继承关系

2. 时序图

时序图直接用作者画的,非常详细


TransmittableThreadLocal-sequence-diagram.png

3. 流程说明

1. createTtl 这一步最重要的就是初始化了holder

private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
        new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
            @Override
            protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
            }
            @Override
            protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
            }
        };

holder的两个重写方法说明:

2. setTtlValue 这一步主要做了两件事

(1)先在ttl的父类ThreadLocal设置该线程的值

@Override
public final void set(T value) {
​    // 先调用ThreadLocal的set方法
    super.set(value);
    if (null == value) { // may set null to remove value
        removeValue();
    } else {
        addValue();
    }
}
​
//ThreadLocal 的set方法
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}​​​

(2) 往holder中添加当前的ttl

private void addValue() {
    if (!holder.get().containsKey(this)) {
        holder.get().put(this, null); // WeakHashMap supports null value.
        // System.out.println(Thread.currentThread().getName()+" "+holder.get()); //可以加下这个输出下加深理解
    }
}

(3) 调用ThreadLocal的get方法

public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        //调用ttl的holder重写的initialValue方法
        return setInitialValue();
    }

所以有了如下的映射关系,通过当前thread获取获取holder中的keySet,通过遍历keySet获取ttl,通过ttl委托ThreadLocal维护变量的值
holder运行时的结构:

main {com.alibaba.Utils$newTtlInstanceAndPut$ttl$1@1fc2b765(parent-create-unmodified-in-child)=null}

3. createBizTaskRunnable

实现runnable接口, 自己的业务逻辑

4. createTtlRunnableWrapper(Runnable runnable)

这一步最重要的是把当前线程的threadLocal的值给拷贝到了capturedRef ,为啥用AtomicRef,需要原子更新操作TtlRunnable line47

4.1 TtlRunnable get(Runnable runnable)

//runnable 提交的业务runnable
//releaseTtlValueReferenceAfterRun 是否在value返回后释放,避免内存泄漏
//idempotent 是否需要幂等
public static TtlRunnable get(Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
        if (null == runnable) {
            return null;
        }

        if (runnable instanceof TtlRunnable) {
            if (idempotent) {
                // avoid redundant decoration, and ensure idempotency
                return (TtlRunnable) runnable;
            } else {
                throw new IllegalStateException("Already TtlRunnable!");
            }
        }
        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
    }

4.2 创建新的TtlRunnable包装runnable

private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        //该线程包含的所有ttl的变量
        //类型是 Map<TransmittableThreadLocal<?>, Object>
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        //是否需要在使用后释放
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

4.3 获取当前线程所有的ttl的实例和对应的threadLocal的值

重点: capture的是父线程的ttl, 也就是创建TtlRunnable的线程,这个很重要很容易误解。

public static Object capture() {
            Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
            //holder.get().keySet()是当前线程使用的ttl
            //在set这一步 holder添加了ttl的关系,同时ttl中set了当前线程的value
            for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }

5. submitTtlRunnableToThreadPool

提交任务给线程池

6. 线程池执行run方法

步骤6之后的逻辑已经和业务代码没有关系了

(1) TtlRunnable重写了Runnable的run方法

@Override
public void run() {
    //获取任务提交时,即包装runnable时的ttl
    Object captured = capturedRef.get();
    if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
        throw new IllegalStateException("TTL value reference is released after run!");
    }
    //执行前 获取backup
    Object backup = replay(captured);
    try {
        //执行run方法
        runnable.run();
    } finally {
        //通过backup恢复当前线程的ttl,避免run方法中修改了ttl
        //避免线程复用造成父线程的ttl丢失
        restore(backup);
    }
}

(2) replay(captured)方法

public static Object replay(Object captured) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // backup
                backup.put(threadLocal, threadLocal.get());

                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // set values to captured TTL
            setTtlValuesTo(capturedMap);

            // call beforeExecute callback
            doExecuteCallback(true);

            return backup;
        }

将capturedMap的值拷贝到当前线程的threadLocal,并更新holder,这一步的精髓就是通过这个Map<TransmittableThreadLocal<?>, Object>类型的值将ttl从父线程往子线程中塞数据。类似的用法其实还有打破java双亲委托往threadContext中塞数据,典型应用就是jdbc的驱动的实现

private static void setTtlValuesTo(Map<TransmittableThreadLocal<?>, Object> ttlValues) {
            for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                @SuppressWarnings("unchecked")
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }

doExecuteCallback 自定义扩展逻辑,可以做前置拦截

(3) 执行run方法

(4) 执行后 restore(backup)

将当前线程的ttl恢复到任务提交时

4. 重点

5. 学习的地方

上一篇 下一篇

猜你喜欢

热点阅读