GitHub 中文社区

TransmittableThreadLocal 源码解析

2018-09-30  本文已影响0人  _晓__

1、介绍

TransmittableThreadLocal(TTL) 是 Alibaba 开源的,用于解决在使用线程池等会池化复用线程的组件情况下,提供 ThreadLocal 值的传递功能,解决异步执行时上下文传递的问题。TransmittableThreadLocal 需要配合 TTL 提供的 TtlExecutors、TtlRunnable 和 TtlCallable使用,也可以使用 Java Agent 无侵入式实现线程池的传递。

2、使用场景

3、TTL 执行流程

TTL 执行流程.png

4、源码解析

TransmittableThreadLocal 继承于 InheritableThreadLocal,并拥有了 InheritableThreadLocal 对子线程传递上下文的特性,只需解决线程池上下文传递问题。

4.1、项目结构

TTL 项目结构.png

4.2、核心部分源码

在 TransmittableThreadLocal 中,定义了一个全局静态变量 holder,用于存储使用 TransmittableThreadLocal set 的上下文。

private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
            new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
                protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                }

                protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                }
            };
public class Thread implements Runnable {

    public Thread() {
        init(null, null, "Thread-" + nextThreadNum(), 0);
    }

    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {

        Thread parent = currentThread();
        // TODO 忽略其他源码
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

        // TODO 忽略其他源码
    }
}

public class ThreadLocal<T> {
    static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        return new ThreadLocalMap(parentMap);
    }

    private ThreadLocalMap(ThreadLocalMap parentMap) {
        // TODO 忽略部分代码

        for (int j = 0; j < len; j++) {
            if (e != null) {
                ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                if (key != null) {
                    Object value = key.childValue(e.value);
                    // TODO 忽略部分代码
                }
            }
        }
    }
}

可能有人有疑问,为什么使用 WeakHashMap。关于 WeakHashMap 不了解的,大家可以自行查询一下。这里只是阐述一下,为什么 TTL 会使用 WeakHashMap。

在使用线程池时,需要使用 TTL 提供的 TtlExecutors 包装,如:

TtlExecutors.getTtlExecutor(Executors.newCachedThreadPool());

让我们继续跟进 TtlExecutors.getTtlExecutor 方法中,探究下这个方法里面究竟做了什么?

public final class TtlExecutors {
  public static Executor getTtlExecutor(Executor executor) {
        if (null == executor || executor instanceof ExecutorTtlWrapper) {
            return executor;
        }
        return new ExecutorTtlWrapper(executor);
    }
}

使用 ExecutorTtlWrapper 包装有什么用呢?那么就继续看看 ExecutorTtlWrapper 里面的实现:

class ExecutorTtlWrapper implements Executor {
    private final Executor executor;

    ExecutorTtlWrapper(Executor executor) {
        this.executor = executor;
    }

    public void execute(Runnable command) {
        executor.execute(TtlRunnable.get(command));
    }

    // TODO 忽略部分代码
}

现在重点介绍一下 TtlRunnable 里面做了什么处理:

public final class TtlRunnable implements Runnable {
    private final AtomicReference<Object> capturedRef;
    private final Runnable runnable;
    private final boolean releaseTtlValueReferenceAfterRun;

    private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        this.capturedRef = new AtomicReference<Object>(TransmittableThreadLocal.Transmitter.capture());
        // TODO 忽略部分代码
    }

    public void run() {
        Object captured = capturedRef.get();
        // TODO 忽略部分代码

        Object backup = TransmittableThreadLocal.Transmitter.replay(captured);
        try {
            runnable.run();
        } finally {
            TransmittableThreadLocal.Transmitter.restore(backup);
        }
    }

     public static TtlRunnable get(Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
        // TODO 忽略部分代码
        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
    }
}
public static class Transmitter {
        
        public static Object capture() {
            Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
            for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }

        public static Object replay(Object captured) {
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // backup
                backup.put(threadLocal, threadLocal.get());

                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // set values to captured TTL
            setTtlValuesTo(capturedMap);

            // call beforeExecute callback
            doExecuteCallback(true);

            return backup;
        }

        public static void restore(Object backup) {
            Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
            // call afterExecute callback
            doExecuteCallback(false);

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // clear the TTL values that is not in backup
                // avoid the extra TTL values after restore
                if (!backupMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // restore TTL values
            setTtlValuesTo(backupMap);
        }

        private static void setTtlValuesTo(Map<TransmittableThreadLocal<?>, Object> ttlValues) {
            for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }
}

PS:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>log4j2-ttl-thread-context-map</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>com.ofpay</groupId>
    <artifactId>logback-mdc-ttl</artifactId>
    <version>1.0.2</version>
</dependency>
上一篇 下一篇

猜你喜欢

热点阅读