transmittable-thread-local源码分析

2019-02-24  本文已影响52人  土豆肉丝盖浇饭

前言

ThreadLocal解决了在多个线程针对一个变量维护不同值的功能,如果你想在同一个线程内传递一些值,那么就可以用到这个类,它的好处是无侵入性,这样我们就不需要再每个方法内透传这个参数,比如Dubbo的RpcContext。另外我们也可以利用这个类来解决在多线程情况下使用线程不安全的类的问题,比如SimpleDateFormat。ThreadLocal的子类InheritableThreadLocal在ThreadLocal的基础上,解决了和线程相关的副本从父线程向子线程传递的问题。如果不使用InheritableThreadLocal,这个变量在父线程和子线程是两个副本。
但是还有另外一种特殊情况,就是我们比较常用的线程池,线程池中的线程会被复用,线程在创建的时候会把父线程当前的inheritableThreadLocals拷贝过去(如果存在,浅拷贝),之后我们再在代码中设置了InternalThreadLocal后,在线程池中的线程就再也获取不到这个新的InheritableThreadLocal了。影响最大的问题就是,我们调用链跟踪系统的traceid等信息,会在线程池中的线程丢失,我们也会丢失一部分调用信息。阿里开源的transmittable-thread-local框架就正是解决这个问题。

我们先来看下InheritableThreadLocal是怎么实现让子线程能访问到父线程的InheritableThreadLocal变量,并且通过这部分源码,也能看出来为什么线程池中的线程一旦创建完成之后被复用时为什么会丢失InheritableThreadLocal。

首先我们在Thread类的构造函数能发现下面这段代码

if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

意思就是说父线程的inheritableThreadLocals存在时,子线程的inheritableThreadLocals会浅拷贝父线程的inheritableThreadLocals

然后我们看InheritableThreadLocal的重载方法

ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

getMap中的返回从threadLocals变为了inheritableThreadLocals。

因为线程的复用,所以这个inheritableThreadLocals只能维持在这个线程创建时候的状态。

下面是测试这个问题的测试用例

    @Data
    @AllArgsConstructor
    static class Pet {
        private String name;
    }

    @Test
    public void testThreadLocalInPool() throws InterruptedException {
        final ThreadLocal<Pet> tl1 = new InheritableThreadLocal<>();
        final ThreadLocal<Pet> tl2 = new InheritableThreadLocal<>();


        Pet pet = new Pet("xiaomao");
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        tl1.set(pet);
        for(int i =0 ;i<2;i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " " + tl1.get());
            });
        }

        Thread.sleep(2000L);

        //inheritableThreadLocal是浅拷贝
        pet.setName("xiaogou");
        for(int i =0 ;i<2;i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " " + tl1.get());
            });
        }


        //线程池中线程一旦创建完成,InheritableThreadLocal就再也传不进去
        pet.setName("xiaoji");
        tl2.set(pet);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+" "+tl2.get());
            }
        });
        
    }

原理

我们先贴一个不使用ttl框架应该怎么解决线程池传递threadlocal变量的解决方案。

       private static ThreadLocal<Map<String,String>> holder = new InheritableThreadLocal<Map<String,String>>(){
        @Override
        protected Map<String,String> initialValue() {
            return new HashMap<>();
        }
    };


    @Data
    public static class WrapedRunnable implements Runnable{

        private Map<String,String> attachment;

        private Runnable runnable;

        public WrapedRunnable(Runnable runnable) {
            this.runnable=runnable;
            this.attachment = new HashMap<>(holder.get());
        }

        @Override
        public void run() {
            holder.set(this.attachment);
            runnable.run();
        }
    }

    @Test
    public void testMandatoryTTL(){

        Executor executor = Executors.newFixedThreadPool(1);
        executor.execute(()->{
            System.out.println("init");
        });

        HashMap<String,String> attachment = new HashMap<>();
        attachment.put("123","456");
        holder.set(attachment);

        //普通方式
        executor.execute(()->{
            System.out.println(holder.get().containsKey("123"));
        });

        //处理后的方式
        executor.execute(new WrapedRunnable(()->{
            System.out.println(holder.get().containsKey("123"));
        }));



    }

上面的这种方式和ttl的设计思想差不多,但是ttl肯定更加优雅,通用性更高。

下面这张图是ttl核心设计逻辑的时序图。我分析过源码后,大家就能很容易看懂它的设计思想了。


源码讲解

后面出现的 tl=ThreadLocal itl=InheritableThreadLocal ttl=TransmittableThreadLocal

就如上面我自己写的那个传递方式一样,ttl也会把需要传递的threadlocal缓存起来,然后在包装类的run方法内重放,设置到子线程。这个缓存的逻辑封装在TransmittableThreadLocal类中。

TransmittableThreadLocal

TransmittableThreadLocal继承了InheritableThreadLocal,重载了get和set方法

@Override
    public final T get() {
        T value = super.get();
        if (null != value) addValue();
        return value;
    }

    @Override
    public final void set(T value) {
        super.set(value);
        // may set null to remove value
        if (null == value) removeValue();
        else addValue();
    }

可以看到在调用父类的逻辑上,新增了addValue和removeValue的逻辑,这个就是缓存的逻辑

private void addValue() {
        if (!holder.get().containsKey(this)) {
            holder.get().put(this, null); // WeakHashMap supports null value.
        }
    }

    private void removeValue() {
        holder.get().remove(this);
    }

会把当前这个threadlocal缓存到holder上面。

下面介绍下这个很关键的holder

holder

private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
            new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
                @Override
                protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                }

                @Override
                protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                }
            };

首先这个holder本身是InheritableThreadLocal类型的,所以它也是和线程相关联的。可以在父子线程间传递,但是对于线程池内已经创建的线程肯定是传递不进去的。所以在初始化wrapper类的时候,那个时候还是父线程,在wrapper类构造的时候,要把这些threadlocal捕获出来,这个捕获相关逻辑见下一个Transmitter的分析。其次这个holder内保存的是一个WeakHashMap<TransmittableThreadLocal<?>, Object>,所以这个WeakHashMap的key是在没被强引用的情况下可以被回收的。另外需要注意的是,这个WeakHashMap设计者是为了利用到它的key可以被回收的特性,就是当做set在使用。

Transmitter

Transmitter内有3个核心方法

方法 作用
capture 捕获父线程的ttl
replay 重放父线程ttl
restore 恢复之前子线程的ttl

capture用于捕获父线程的ttl,捕获操作要在父线程执行

public static Object capture() {
            Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
            for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }

replay传入capture方法捕获的ttl,然后在子线程重放,也就是调用ttl的set方法,会设置到当前的线程中去,最后会把子线程之前存在的ttl返回

public static Object replay(@Nonnull Object captured) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // backup
                backup.put(threadLocal, threadLocal.get());

                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
                //清除之前上下文,不在capturedMap中,都清除
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // set values to captured TTL
            //这边是在子线程设置ttl的逻辑
            setTtlValuesTo(capturedMap);

            // call beforeExecute callback
            doExecuteCallback(true);

            return backup;
        }

setTtlValuesTo用于在子线程设置ttl,逻辑如下

private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
            for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                @SuppressWarnings("unchecked")
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                //这边是设置到当前线程
                threadLocal.set(entry.getValue());
            }
        }

其实就是调用ttl的set方法,看过ThreadLocal源码的你应该懂。

最后就是执行结束,restore之前的上下文,用到replay返回的back。

public static void restore(@Nonnull Object backup) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
            // call afterExecute callback
            doExecuteCallback(false);

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // clear the TTL values that is not in backup
                // avoid the extra TTL values after restore
                // 清除之前的上下文,不在backupMap中的都清除了
                if (!backupMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // restore TTL values
            // 恢复到运行之前的状态
            setTtlValuesTo(backupMap);
        }

要把capture,repaly和restore的逻辑串起来,那么就需要看下面的TtlRunnable类,这个就是我一直说的包装类。

TtlRunnable

我们先看TtlRunnable的构造函数

 private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        //捕获父线程ttl
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

在构造函数,也就是父线程,会通过capture捕获父线程的ttl,然后保存在capturedRef中。

在run方法中,replay,restore逻辑一目了然,不多解释。

public void run() {
        Object captured = capturedRef.get();
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }

        Object backup = replay(captured);
        try {
            runnable.run();
        } finally {
            restore(backup);
        }
    }

所以我们在项目中想用到ttl的时候,可以这么使用

@Data
    @AllArgsConstructor
    static class Pet {
        private String name;
    }


    @Test
    public void compareTLAndTTL() throws InterruptedException {
        Executor executor = Executors.newFixedThreadPool(1);
        executor.execute(()->{
            System.out.println("init");
        });

        ThreadLocal<Pet> tl = new ThreadLocal<>();
        tl.set(new Pet("xiaogou"));

        executor.execute(()->{
            //这边根本拿不到父线程的tl
            System.out.println(tl.get());
        });


        TransmittableThreadLocal<Pet> ttl = new TransmittableThreadLocal<>();
        ttl.set(new Pet("xiaomao"));

        executor.execute(TtlRunnable.get(()->{
            System.out.println(ttl.get());
            //证明ttl是浅拷贝
            ttl.get().setName("xiaogou");
        }));

        Thread.sleep(1000L);

        System.out.println(ttl.get());

    }

输出如下

init
null
SPITest.Pet(name=xiaomao)
SPITest.Pet(name=xiaogou)

但是这样使用起来也太麻烦了,我们需要修改我们的使用方式,有没有无侵入的使用方式?我们可以把上面包装Runnable的逻辑封装到线程池中去。因此用到了ExecutorTtlWrapper。

ExecutorTtlWrapper

class ExecutorTtlWrapper implements Executor, TtlEnhanced {
    private final Executor executor;

    ExecutorTtlWrapper(@Nonnull Executor executor) {
        this.executor = executor;
    }

    @Override
    public void execute(@Nonnull Runnable command) {
        executor.execute(TtlRunnable.get(command));
    }

    @Nonnull
    public Executor unwrap() {
        return executor;
    }
}

代码很简单,不多解释了。
我们可以通过TtlExecutors这个工具类来快捷获取这些包装TtlRunbale逻辑的线程池。但是这样还是比较麻烦的,因此用到下面这个TtlAgent类,它利用了jvm的Instrument机制,可以在编译的时候修改字节码,在jdk的线程池源码中加入TtlRunnable封装的逻辑。

TtlAgent

instrument的原理以及如何配置不是本文重点,大家知道它干了什么就好了,可以在参考贴的链接学习,这个技术在很多中间件用到

public static void premain(String agentArgs, @Nonnull Instrumentation inst) {
        //解析key-value配置
        kvs = splitCommaColonStringToKV(agentArgs);
        //根据kv配置 设置日志打印方式
        Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));
        final Logger logger = Logger.getLogger(TtlAgent.class);

        try {
            logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst);
            //获取kv中关于是否禁止向子线程传递ttl的配置
            final boolean disableInheritable = isDisableInheritableForThreadPool();

            final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
            //修改java.util.concurrent.ThreadPoolExecutor,java.util.concurrent.ScheduledThreadPoolExecutor的代码
            transformletList.add(new TtlExecutorTransformlet(disableInheritable));
            //修改另外一个线程池
            transformletList.add(new TtlForkJoinTransformlet(disableInheritable));
            //根据配置决定是否修改TimeTask源码,阿里规范不建议使用这个类做定时任务
            if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet());
            //把我们的转换器设置到inst中去
            final ClassFileTransformer transformer = new TtlTransformer(transformletList);
            inst.addTransformer(transformer, true);
            logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success");

            logger.info("[TtlAgent.premain] end");

            ttlAgentLoaded = true;
        } catch (Exception e) {
            String msg = "Fail to load TtlAgent , cause: " + e.toString();
            logger.log(Level.SEVERE, msg, e);
            throw new IllegalStateException(msg, e);
        }
    }

premain用于我们向jvm注册我们的转换器,根据转换器内的逻辑,我们可以修改对应的class文件源码。

我们直接来看下TtlExecutorTransformlet中是怎么修改源码的,核心代码如下

PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.lang.Runnable", "com.alibaba.ttl.TtlRunnable");
        PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.util.concurrent.Callable", "com.alibaba.ttl.TtlCallable");

CtClass[] parameterTypes = method.getParameterTypes();
        StringBuilder insertCode = new StringBuilder();
        for (int i = 0; i < parameterTypes.length; i++) {
            final String paramTypeName = parameterTypes[i].getName();
            if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
                String code = String.format("$%d = %s.get($%d, false, true);", i + 1, PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName), i + 1);
                logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ": " + code);
                insertCode.append(code);
            }
        }

大概意思就是,在ThreadPoolExecutor类中,如果方法参数含有Runnable或者Callable,会在方法体第一行,加上一段代码

runnable = com.alibaba.ttl.TtlRunnable.get(runnable,false,true) 
callable   = com.alibaba.ttl.TtlCallable(runnable,false,true) 

这样就实现了无感知包装Runnable的逻辑。

具体如何使用这个agent 我们需要增加如下的jvm启动参数

-javaagent:/path/to/transmittable-thread-local-2.x.x.jar=ttl.agent.logger:STDOUT,ttl.agent.disable.inheritable.for.thread.pool:true

等于号后面的是额外配置参数,具体如何配置可以看TtlAgent类的注释

最后

通过agent相当于无侵入引入了ttl,但是ttl的创建这一步还是需要我们手动的,不可能去改写tl或者itl的字节码,tl,itl,ttl三者在jvm内共存
ttl框架主要还是用于中间件,但是我们还是需要了解的,学习一个知识点需要深入,万一以后遇到这种坑了呢。

参考

ThreadLocal原理及内存泄露预防
transmittable-thread-local github
TransmittableThreadLocal详解
agent官方文档

上一篇下一篇

猜你喜欢

热点阅读