线程池分布式追踪方案实现

2020-12-31  本文已影响0人  java_飞

场景:在生产环境种,查询发现错误日志时,需要定位跟踪到最终问题,但是项目种用到了很多的异步线程池,导致定位问题时根据traceId,只能找到一部分日志内容,无法最终定位到问题所在;

原因:这是由于MDC的实现是通过ThreadLocal实现的,然后线程池种的线程和用户线程不是同一个线程,所以在异步调用的时候,用户线程的内容没有同步到线程池线程中,导致最终没有打印出关联的tracId,从而导致最终无法关联相关日志,无法定位到具体问题;

解决方案:因为实现异步的方式有很多种,例如线程池和ThreadPoolExecutor和ThreadPoolTaskExecutor,spring的@Aync等,但是根本原因都是因为两个线程不是同一个线程引起的,所以我这里只例举线程池ThreadPoolExecutor的实现,其他实现相差无几;
ThreadMdcUtil.class

import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.Callable;

/**
 * className:ThreadMdcUtil
 * description: mdcUtil
 *
 * @author: david
 * date: 2020/12/31 10:42 上午
 */
public class ThreadMdcUtil {


    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

ThreadPoolExecutorMdcWrapper.class

import org.slf4j.MDC;
import java.util.concurrent.*;

/**
 * className:ThreadPoolExecutorMdcWrapper
 * description: 线程池包装类
 *
 * @author: david
 * date: 2020/12/31 10:41 上午
 */
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {
    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

具体使用方式如下:
在需要使用线程池的时候,使用ThreadPoolExecutorMdcWrapper类创建线程池去执行相关逻辑;

上一篇 下一篇

猜你喜欢

热点阅读