从ThreadLocal到TransmittableThread

2020-03-18  本文已影响0人  但时间也偷换概念

ThreadLocal

/*
 * This class provides thread-local variables.  These variables differ from
 * their normal counterparts in that each thread that accesses one (via its
 * {@code get} or {@code set} method) has its own, independently initialized
 * copy of the variable.  {@code ThreadLocal} instances are typically private
 * static fields in classes that wish to associate state with a thread (e.g.,
 * a user ID or Transaction ID).
 *
 * <p>For example, the class below generates unique identifiers local to each
 * thread.
 * A thread's id is assigned the first time it invokes {@code ThreadId.get()}
 * and remains unchanged on subsequent calls.
 * <pre>
 * @author  Josh Bloch and Doug Lea
 * @since   1.2
 */
public class ThreadLocal<T> 

我们可以得知ThreadLocal的作用是:ThreadLocal的作用是提供线程内的局部变量,不同的线程之间不会相互干扰,这种变量在线程的生命周期内起作用,减少同一个线程内多个函数或组件之间一些公共变量的传递的复杂度。

以一个国际化场景为例,如果在所有hsf服务上下文都需要定义一个语言环境。

/**
 * @author zhengyu.nie
 * 2020/3/18.
 */
public class LanguageThreadLocal {

    private static ThreadLocal<LanguageContext> languageContextThreadLocal = new ThreadLocal<>();

    public static void setLanguageContext(LanguageContext context) {
        languageContextThreadLocal.set(context);
    }

    public static LanguageContext getLanguageContext() {
        return languageContextThreadLocal.get();
    }

    public static void remove() {
        languageContextThreadLocal.remove();
    }

    @Builder
    @Getter
    @ToString
    public static class LanguageContext {
        private String language;
        private String code;
        private String locale;
    }
}

/**
 * @author zhengyu.nie
 * 2020/3/18.
 */
public class LanguageThreadLocalTest {

    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            int finalI = i;
            CompletableFuture.runAsync(() -> {
                LanguageThreadLocal.setLanguageContext(
                    LanguageContext.builder().code("EN").language("en-us").locale(finalI == 1 ? "en-hk" : "en-tw").build());

                System.out.println(LanguageThreadLocal.getLanguageContext());

            }).whenComplete((aVoid, throwable) -> System.out.println("run over"));
        }
    }
}

console:
LanguageThreadLocal.LanguageContext(language=en-us, code=EN, locale=en-tw)
LanguageThreadLocal.LanguageContext(language=en-us, code=EN, locale=en-hk)
run over
run over

ThreadLocal是线程独立的上下文,而fork子线程的场景,子线程将读不到父线程的上下文。如果处理这种情况,会增加业务代码。如果不处理,将影响程序逻辑。

关于ThreadLocal实现原理、内存泄漏、remove、WeakReference相关细节,不在这篇文章中讨论。

InheritableThreadLocal

/**
 * This class extends <tt>ThreadLocal</tt> to provide inheritance of values
 * from parent thread to child thread: when a child thread is created, the
 * child receives initial values for all inheritable thread-local variables
 * for which the parent has values.  Normally the child's values will be
 * identical to the parent's; however, the child's value can be made an
 * arbitrary function of the parent's by overriding the <tt>childValue</tt>
 * method in this class.
 *
 * <p>Inheritable thread-local variables are used in preference to
 * ordinary thread-local variables when the per-thread-attribute being
 * maintained in the variable (e.g., User ID, Transaction ID) must be
 * automatically transmitted to any child threads that are created.
 *
 * @author  Josh Bloch and Doug Lea
 * @see     ThreadLocal
 * @since   1.2
 */

public class InheritableThreadLocal<T> extends ThreadLocal<T> 

public class InheritableLanguageThreadLocalTest {

    public static void main(String[] args) {
        LanguageThreadLocal.setLanguageContext(
            LanguageContext.builder().code("EN").language("en-us").locale("en-hk").build());

        CompletableFuture.runAsync(()->{
            System.out.println(LanguageThreadLocal.getLanguageContext());
        }).whenComplete((aVoid, throwable) -> System.out.println("success callback"));

    }
}

console:
LanguageThreadLocal.LanguageContext(language=en-us, code=EN, locale=en-hk)
success callback

无论是Runnable、Callable、CompleableFuture、ListenableFuture、new Thread哪种方式创建线程,最终还是会走到Thread类的构造,譬如new Thread(Runnable)。

首先看一下Thread构造函数与成员变量中的两个变量

public class Thread implements Runnable {

    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    /**
     * Allocates a new {@code Thread} object. This constructor has the same
     * effect as {@linkplain #Thread(ThreadGroup,Runnable,String) Thread}
     * {@code (null, null, gname)}, where {@code gname} is a newly generated
     * name. Automatically generated names are of the form
     * {@code "Thread-"+}<i>n</i>, where <i>n</i> is an integer.
     */
    public Thread() {
        init(null, null, "Thread-" + nextThreadNum(), 0);
    }

 /**
     * Initializes a Thread.
     *
     * @param g the Thread group
     * @param target the object whose run() method gets called
     * @param name the name of the new Thread
     * @param stackSize the desired stack size for the new thread, or
     *        zero to indicate that this parameter is to be ignored.
     * @param acc the AccessControlContext to inherit, or
     *            AccessController.getContext() if null
     * @param inheritThreadLocals if {@code true}, inherit initial values for
     *            inheritable thread-locals from the constructing thread
     */
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
        if (name == null) {
            throw new NullPointerException("name cannot be null");
        }

        this.name = name;

        Thread parent = currentThread();
        SecurityManager security = System.getSecurityManager();
        if (g == null) {
            /* Determine if it's an applet or not */

            /* If there is a security manager, ask the security manager
               what to do. */
            if (security != null) {
                g = security.getThreadGroup();
            }

            /* If the security doesn't have a strong opinion of the matter
               use the parent thread group. */
            if (g == null) {
                g = parent.getThreadGroup();
            }
        }

        /* checkAccess regardless of whether or not threadgroup is
           explicitly passed in. */
        g.checkAccess();

        /*
         * Do we have the required permissions?
         */
        if (security != null) {
            if (isCCLOverridden(getClass())) {
                security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
            }
        }

        g.addUnstarted();

        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        tid = nextThreadID();
    }

所以在init方法时候会进行上下文拷贝,如果判断inheritableThreadLocals不为空。

ThreadLocal在线程池场景下,会因为线程池核心线程的复用性,带来“脏数据”

于是又有了ttl。

TransmittableThreadLocal

TransmittableThreadLocal是阿里开源的项目。

github地址:
TransmittableThreadLocal

两种使用方式

首先定义一个线程池ThreadPoolExecutor对象,然后使用ttl进行wrap。
如下,在Spring环境中注入一个自定义的线程池。

   @Bean
    ExecutorService initialDataPool() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 9, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1000), r -> {

            Thread thread = new Thread(r);
            thread.setDaemon(false);
            thread.setName("initialDataPoolTask-" + THREAD_COUNT.getAndIncrement());
            return thread;
        }, new AbortPolicy());
        return TtlExecutors.getTtlExecutorService(threadPoolExecutor);
    }

CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().getName().startsWith("initialDataPoolTask-"));
        return s.toUpperCase();
    }, initialDataPool);

如果嵌套比较多的CompleableFuture(如下),记得都加上线程池参数。

static void thenAcceptBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            (s1, s2) -> result.append(s1 + s2));
    assertEquals("MESSAGEmessage", result.toString());
}

具体参考:
使用java-agent来修饰jdk线程池实现类

上一篇 下一篇

猜你喜欢

热点阅读