架构&系统设计gateway0.面试技能

分布式traceId

2023-05-22  本文已影响0人  雪飘千里

1、常见场景

场景一:工作中根据日志排查问题时我们经常想看到某个请求下的所有日志,可是由于生产环境并发很大,服务被调过于频繁,日志刷新太快每个请求之间的日志并不连贯,互相穿插,如果在打印日志时没有为日志增加一个唯一标识,是没法分辨出哪些日志是哪个请求打印,会影响到测试联调、线上问题排查的效率;

场景二:我们想知道一个请求中所有和该请求相关的链路日志,尤其是涉及到多个微服务时,此时也需要为日志增加一个唯一标识。通常可以使用UUID或者其它雪花算法等作为唯一标识;

基于以上场景,我们可以为每个请求设置一个traceId,这个请求整个链路公用同一个traceId,然后将日志收集到统一日志平台通过日志关键字找出traceId,再根据traceId,找出整个链路的请求过程,甚至还可以与分布式链路框架skywalking结合,分析链路的性能。

2、实现原理

2.1 MDC

MDC是(Mapped Diagnostic Context,映射调试上下文)是日志框架 log4j 和 logback 支持的一种方便在多线程条件下记录追踪日志的功能。

2.2 MDC原理

MDC 可以看成是一个与当前线程绑定的哈希表,可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容。当需要记录日志时,只需要从 MDC 中获取所需的信息即可。MDC 的内容则由程序在适当的时候保存进去。

MDC 底层最终使用的是 ThreadLocal 实现。

2.3 MDC使用

API 说明:

importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;
importorg.slf4j.MDC;importjava.util.UUID;

/*** MDC快速入门示例*/

public classSimpleMDC {

    private static final Logger logger = LoggerFactory.getLogger(SimpleMDC.class);
    public static final String REQ_ID = "REQ_ID";

    public static voidmain(String[] args) {

        MDC.put(REQ_ID, UUID.randomUUID().toString());

        logger.info("开始调用服务A,进行业务处理");

        logger.info("业务处理完毕,可以释放空间了,避免内存泄露");

        MDC.remove(REQ_ID);

        logger.info("REQ_ID 还有吗?{}", MDC.get(REQ_ID) != null);
    }
}

2.4 MDC使用场景

3、分布式traceId实现方案

1、 引入log4j的同时需要注意去除冲突包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    <exclusions>
        <!-- 排除自带的logback依赖 -->
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.21</version>
</dependency>

2、resources 文件夹下添加[log4j.xml]配置文件

<appender name="trace" class="org.apache.log4j.DailyRollingFileAppender">
    <param name="File" value="D://logs//trace.log"/>
    <param name="DatePattern" value="'.'yyyy-MM-dd"/>
    <param name="threshold" value="info"/>
    <param name="append" value="true"/>
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] - [%X{traceId}] %-6p%c:%L - %m%n"/>
    </layout>
</appender>
 
<root>
    <level value="info"/>
    <appender-ref ref="trace"/>
</root>

3、添加[MDCUtils]自定义工具类

public class MDCUtils {
 
    /**
     * [获取 traceId]
     * @return java.lang.String
     **/
    public static String mdc(){
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = (HttpServletRequest) requestAttributes
                .resolveReference(RequestAttributes.REFERENCE_REQUEST);
 
        String traceId;
        String traceIdKey = "traceId";
        if (request.getHeader(traceIdKey) == null) {
            traceId = UUID.randomUUID().toString();
        } else {
            traceId = request.getHeader(traceIdKey);
        }
        return traceId;
    }
 
}

4、添加拦截器[LogInterceptor]为每一个请求头添加[traceId]

public class LogInterceptor implements HandlerInterceptor {
 
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String traceIdKey = "traceId";
        String traceId = MDCUtils.mdc();
        request.setAttribute(traceIdKey, traceId);
        MDC.clear();
        MDC.put(traceIdKey, traceId);
        return true;
    }
}

5、注册拦截器

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
 
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LogInterceptor()).addPathPatterns("/test/*");
    }
}

6、在每次调用外部接口的httpclient地方添加请求头[traceId]

public class HttpClientHelper {
 
    public static String sendPost(String url, LinkedHashMap<String, Object> paramMap, Map<String, Object> headMap) {
        // 获取连接客户端工具
        CloseableHttpClient httpClient = HttpClients.createDefault();
        String entityStr = null;
        CloseableHttpResponse response = null;
 
        try {
            // 创建POST请求对象
            HttpPost httpPost = new HttpPost(url);
            UrlEncodedFormEntity entityParam = null;
 
            /*
             * 添加请求参数
             */
            // 创建请求参数
            if (!CollectionUtils.isEmpty(paramMap)) {
                List<NameValuePair> paramertersList = new LinkedList<>();
 
                Set<String> keys = paramMap.keySet();
                for (String s : keys) {
                    String key = String.valueOf(s);
                    String value = ObjectUtils.isEmpty(paramMap.get(key)) ? null : paramMap.get(key).toString();
                    if (ObjectUtils.isNotEmpty(value)) {
                        BasicNameValuePair param1 = new BasicNameValuePair(key, value);
                        paramertersList.add(param1);
                    }
                }
                // 使用URL实体转换工具
                entityParam = new UrlEncodedFormEntity(paramertersList, "UTF-8");
            }
 
            httpPost.setEntity(entityParam);
 
            if (headMap != null) {
                for (Map.Entry<String, Object> entry : headMap.entrySet()) {
                    if (entry.getValue() != null) {
                        httpPost.addHeader(entry.getKey(), entry.getValue().toString());
                    }
                }
            }
 
            String traceId = MDC.get("traceId");
            httpPost.addHeader("traceId", traceId);
            httpPost.addHeader("Accept", "*/*");
            httpPost.addHeader("Connection", "Keep-Alive");
            httpPost.addHeader("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.7.6)");
            // 传输的类型
            httpPost.addHeader("Content-Type", "application/x-www-form-urlencoded");
 
            // 执行请求
            response = httpClient.execute(httpPost);
            // 获得响应的实体对象
            HttpEntity entity = response.getEntity();
            // 使用Apache提供的工具类进行转换成字符串
            entityStr = EntityUtils.toString(entity, "UTF-8");
 
            // 此处获取所有的响应头信息并进行打印
            System.out.println(Arrays.toString(response.getAllHeaders()));
        } catch (ClientProtocolException e) {
            System.err.println("Http协议出现问题");
            e.printStackTrace();
        } catch (ParseException e) {
            System.err.println("解析错误");
            e.printStackTrace();
        } catch (IOException e) {
            System.err.println("IO异常");
            e.printStackTrace();
        } finally {
            // 释放连接
            if (null != response) {
                try {
                    response.close();
                    httpClient.close();
                } catch (IOException e) {
                    System.err.println("释放连接出错");
                    e.printStackTrace();
                }
            }
        }
        return entityStr;
    }
 
}

4、多线程间使用

MDC异步线程间传递:
MDC的put时,子线程在创建的时候会把父线程中的inheritableThreadLocals变量设置到子线程的inheritableThreadLocals中,而MDC内部是用InheritableThreadLocal实现的,所以会把父线程中的上下文带到子线程中
但在线程池中,由于线程会被重用,但是线程本身只会初始化一次,所以之后重用线程的时候,就不会进行初始化操作了,也就不会有父线程inheritableThreadLocals拷贝到子线程中的过程了,这个时候如果还想传递父线程的上下文的话,就要使用getCopyOfContextMap方法

4.1 MDC工具类

定义MDC工具类,支持RunnableCallable两种,目的就是为了把父线程的traceId设置给子线程

import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

import java.util.Map;
import java.util.concurrent.Callable;

/**
 * @Description 封装MDC用于向线程池传递
 */
public class MDCUtil {
    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {//清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
                MDC.clear();
            }
        };
    }

 public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }

    public static void setMDCContextMap(final Map<String, String> context) {
        if (CollectionUtils.isEmpty(context)) {
            MDC.clear();
        } else {
            MDC.setContextMap(context);
        }
    }

}

4.2 Spring ThreadPoolTaskExecutor线程池使用

配置线程池

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolMdcWrapper();
    //核心线程数,默认为1
    taskExecutor.setCorePoolSize(1);
    //最大线程数,默认为Integer.MAX_VALUE
    taskExecutor.setMaxPoolSize(200);
    //队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE
    taskExecutor.setQueueCapacity(2000);
    //线程池维护线程所允许的空闲时间,默认为60s
    taskExecutor.setKeepAliveSeconds(60);
    //线程池对拒绝任务(无线程可用)的处理策略
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    // 初始化线程池
    taskExecutor.initialize();
    return  taskExecutor;
}

继承ThreadPoolTaskExecutor

public class ThreadPoolMdcWrapper extends ThreadPoolTaskExecutor {

    public ThreadPoolMdcWrapper() {

    }

    @Override
    public void execute(Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

4.2 使用ExecutorCompletionService方式

当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。

ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。

ExecutorCompletionService的设计目的在于提供一个可获取线程池执行结果的功能,这个类采用了装饰器模式,需要用户提供一个自定义的线程池,在ExecutorCompletionService内部持有该线程池进行线程执行,在原有的线程池功能基础上装饰额外的功能。

ExecutorCompletionService 相比之前 Future 相比 ,提供了一个通知机制,将结果统一到一个队列,当前提交任务不会阻塞获取,从另一个队列中阻塞获取。

/**
 * 使用MDC传递traceId
 */
public class Demo {

    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;

    public void demo() {
        ExecutorCompletionService ecs = new ExecutorCompletionService(threadPoolExecutor);
        ecs.submit(MDCUtil.wrap(new TestMDC(), MDC.getCopyOfContextMap()));
    }
    
    class TestMDC implements Callable {
        @Override
        public Object call() throws Exception {
            // TODO 代码逻辑
            return null;
        }
    }
}

2.3.3 使用CompletableFuture方式

/**
 * 使用MDC传递traceId
 */
public class Demo {

    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;

    private CompletableFuture<Result> test() {
    
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        
        return CompletableFuture.supplyAsync(() -> {
        
            // 必须在打印日志前设置
            MDCUtil.setMDCContextMap(copyOfContextMap);
            //MDC.put("subTraceId",''); //如果需要对子线程进行加线程跟踪号,可在此处设定
            // TODO 业务逻辑
            return new Result();
            
        }, threadPoolExecutor).exceptionally(new Function<Throwable, Result>() {
            /**捕捉异常,不会导致整个流程中断**/
            @Override
            public Result apply(Throwable throwable) {
                log.error("线程[{}]发生了异常[{}], 继续执行其他线程", Thread.currentThread().getName(), throwable.getMessage());
                return null;
            }
        });
    }
}
上一篇下一篇

猜你喜欢

热点阅读