日志分布式

分布式日志跟踪组件-Sleuth

2024-04-23  本文已影响0人  农夫_三拳
  1. Spring Cloud Sleuth介绍
    1.1 是什么
    Spring Cloud Sleuth提供了Spring Cloud分布式追踪解决方案的API,集成了OpenZipKin Brave(推特的开源框架)
    ,可以跟踪服务的请求和消息;Spring Cloud Sleuth的分布式跟踪支持SpringBoot自动配置。

1.2 特点
(1)搭配Slf4j MDC 和 logback,添加跟踪数据(traceId和spanId),可以在日志文件中从给定的跟踪或者跨度提取所有的日志,其中跨系统用traceId串起来,内部线程之间是spanId。
(2)支持常见的Spring应用程序,例如servlet filter,restTemplate,feignClient,scheduled actions和message channels等
(3)可以搭配组件ZipKin,使用 spring-cloud-sleuth-zipkin 组件,ZipKin可以将日志收集,进行可视化展示和全文搜索,并根据日志信息数据进行性能分析、数据分析和链路优化等功能。
(4)Sleuth作为分布式追踪解决方案,支持多线程下日志链路追踪。
(5)Sleuth默认支持大部分服务访问做链路追踪,目前支持的有:rxjava、feign、quartz、RestTemplate、grpc、kafka、redis等。

1.3 常见名词
(1)Span:基本的工作单位,每个线程或者某次请求就是一个span;它是由一个64位的spanId和标记所属的Trace的TraceId组成,同时也会包含些额外的信息,比如说:进程ID,服务名称和tags标签信息等。
(2)Trace:形成树形结构的一组跨度,一条完整的链路追踪就是Trace;内部包含多个Span。
(3)TraceContext:传播上下文,它包含链路数据信息;主要有链路标识符(traceId、spanId和parentId等)、采样数据和采样状态等。
(4)traceId:整个链路跟踪的标识符。
(5)spanId:某个线程或者某次请求即span的标识符
(6)parentId:当前span父节点的spanId,例如A->B,此时A的spanId就是B的parentId。
各名词之间关系图


image.png
image.png

通过以上流程图可知,Sleuth底层逻辑调用链追踪有两个任务,一是标记出一次调用请求种所有的日志,即通过TraceId将所有服务日志串联起来,形成一个完整的调用链;二是梳理日志前后关系,使用的SpanId和ParentSpanId来标记,每个服务的调用都有一个唯一的SpanId,ParentSpanId代表上个服务即调用方的SpanId。其中traceId、spanId和parentId等组成TraceContext传播上下文。

1.4 实操
1.4.1 引入组件Sleuth
本次使用的Spring Cloud Sleuth版本是2.2.8.RELEASE,对应的Spring Boot、Spring Cloud和Spring Cloud Alibaba的版本是
Spring Cloud Alibaba Version
Spring Cloud Version
Spring Boot Version
2.2.7.RELEASE
Spring Cloud Hoxton.SR12
2.3.12.RELEASE
在Springboot项目中加入sleuth组件依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>

加入依赖后,在logback-spring.xml的pattern处加上配置[trace=%X{X-B3-TraceId:-},span=%X{X-B3-SpanId:-}],示例如下

<pattern>%d %-5p [%F:%L] [trace=%X{X-B3-TraceId:-},span=%X{X-B3-SpanId:-}] %markMsg%n</pattern>

启动服务,日志打印如下:

2024-02-22 17:23:56,695 INFO  [DynamicServerListLoadBalancer.java:222] [trace=57a46058a9ca15fc,span=57a46058a9ca15fc] Using serverListUpdater PollingServerListUpdater
2024-02-22 17:23:56,721 INFO  [DynamicServerListLoadBalancer.java:150] [trace=57a46058a9ca15fc,span=57a46058a9ca15fc] DynamicServerListLoadBalancer for client paygateway initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=paygateway,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:com.alibaba.cloud.nacos.ribbon.NacosServerList@61a8c9e7

1.4.2 扩展应用
在跟踪链路中添加requestId
在实际使用中,通常我们想将服务端某条日志记录和客户端的请求关联起来,那么我们可以将客户端的requestId增加到数据链路中,并在日志中打印出来。
要实现这个功能,就要使用到Sleuth中往传播上下文添加额外信息的功能点,传播上下文除了traceId和spanId等这些必要字段,其他都称为额外字段,这些额外字段的简单名称是“Baggage”。
a.定义额外数据包装类

import brave.baggage.BaggageField;
import brave.baggage.CorrelationScopeConfig;
import brave.context.slf4j.MDCScopeDecorator;
import brave.propagation.CurrentTraceContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TracerBaggageConfig {


    @Bean
    public BaggageField baggageField(){
   
        return BaggageField.create("requestId");
    }

    
    @Bean
    public CurrentTraceContext.ScopeDecorator mdcScopeDecorator(){
        return MDCScopeDecorator.newBuilder()
                .clear()
                .add(CorrelationScopeConfig.SingleCorrelationField.newBuilder(baggageField())
                        //实时刷新
                        .flushOnUpdate()
                        .build())
                .build();

    }
}

b.在过滤器中将requestId的值加到传播上下文TraceContext

import brave.Span;
import brave.Tracer;
import brave.baggage.BaggageField;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.sleuth.instrument.web.TraceWebServletAutoConfiguration;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;


@Component
@Slf4j
public class CustomSpanFilter implements GlobalFilter, Ordered {


    @Autowired
    private Tracer tracer;
    @Autowired
    private BaggageField baggageField;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Span currentSpan = this.tracer.currentSpan();
        if(currentSpan == null){
            return chain.filter(exchange);
        }
        //目前是从header中获取requestId,在实际使用中请自定义
        HttpHeaders headers = exchange.getRequest().getHeaders();
        String requestId = headers.getFirst("requestId");
        if(requestId == null){
            return chain.filter(exchange);
        }
        //设置requestId的值
        baggageField.updateValue(currentSpan.context(),requestId);
        return chain.filter(exchange);
    }

  /**
   * 在TraceContext中的跟踪数据添加完后执行
   * @return
   */
    @Override
    public int getOrder() {
        return TraceWebServletAutoConfiguration.TRACING_FILTER_ORDER + 1;
    }
}

除了代码还需要配置两个配置项

spring:
  sleuth:
    baggage:
      # 要添加到MDC的上下文字段列表
      correlation-fields:
        - requestId
      # 设置接收并传播到远程服务的字段列表
      remoteFields:
        - requestId

logback-spring.xml中加上requestId配置

<pattern>%d %-5p [%F:%L] [trace=%X{X-B3-TraceId:-},span=%X{X-B3-SpanId:-},requestId=%X{requestId:-}] %markMsg%n</pattern>

运行后日志

2024-02-23 11:56:29,578 INFO  [ServiceInfoHolder.java:184] [trace=5c40370d8161278a,span=5c40370d8161278a,requestId=12345] init new ips(0) service: DEFAULT_GROUP@@transaction -> []
2024-02-23 11:56:29,579 INFO  [ServiceInfoHolder.java:169] [trace=5c40370d8161278a,span=5c40370d8161278a,requestId=12345] current ips:(0) service: DEFAULT_GROUP@@transaction -> []
2024-02-23 11:56:29,581 INFO  [DynamicServerListLoadBalancer.java:150] [trace=5c40370d8161278a,span=5c40370d8161278a,requestId=12345] DynamicServerListLoadBalancer for client transaction initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=transaction,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:com.alibaba.cloud.nacos.ribbon.NacosServerList@4a4da60b

注意:向MDC中添加额外字段,会影响性能,可以加但不要加多。
跨线程日志跟踪功能
sleuth数据跟踪兼容线程,需要将Runnable/Callable包装在sleuth的包装类中,代码块示例如下
a.Runnable

/**
 * 定义线程
 */
Runnable runnable = new Runnable(){
   @Override
   public void run(){
      //do same work
   }
}

// Tracing tracing 该对象注入进来

//方法一 手动创建带有显式"calculateTax" Span名称的"TraceRunnable"
Runnable traceRunnable = new TraceRunnable(tracing,spanName,runnable,"calculateTax")

//方法二 用"Tracing"包装"Runnable",此方法要保证当前span可用
Runnable traceRunnableFromTracer = traceing.currentTraceContext().wrap(runnable);

b.Callable

/**
 * 定义线程
 */
Callable<String> callable = new Callable<String>() {
    @Override
    public String call() throws Exception {
        return someLogic();
    }

    }
};

// Tracing tracing 该对象注入进来

// 方法一 手动创建带有显式"calculateTax" Span名称的"TraceRunnable"
Callable<String> traceCallable = new TraceCallable<>(tracing, spanNamer,
        callable, "calculateTax");
        
//方法二 用"Tracing"包装"Callable",此方法要保证当前span可用
Callable<String> traceCallableFromTracer = tracing.currentTraceContext()
        .wrap(callable);

sleuth的包装方式有两种,一种是使用TraceCallable显示包装,一种是使用Tracing,在当前span中设置去
多线程链路追踪
Sleuth支持对异步任务的链路追踪,在项目中使用@Async 注解开启一个异步任务后,Sleuth会为异步任务重新生成一个Span,但是如果使用了自定义的异步任务线程池,则会导致Sleuth无法创建一个Span,而是会重新生成Trace和Span。此时需要使用Sleuth退供的Executor类来包装异步任务线程池,才能在异步任务调用链路中重新创建Span。
使用@Async注解开启异步任务
(1)先在服务的ServiceImpl接口中定义一个asyncMethod()方法

@Async
@Override
public void asyncMethod(){
   log.info("执行异步任务。。")
}

(2)在Controller或者service中调用

@GetMapping(value = "/async/api")
public String asyncApi() {
    log.info("执行异步任务开始...");
    testService.asyncMethod();
    log.info("异步任务执行结束...");
    return "asyncApi";
}

自定义任务线程池
使用Sleuth提供的Executor类来定义异步任务线程池,Sleuth包含所有的Executor类,主要有LazyTraceExecutor、LazyTraceExecutor、TraceableExecutorService、LazyTraceThreadPoolTaskScheduler和LazyTraceThreadPoolTaskExecutor等,详情请参考源码

image.png

LazyTraceThreadPoolTaskExecutor是继承ThreadPoolTaskExecutor,用法相似。其他Sleuth的Executor类实现原理相同。
用LazyTraceThreadPoolTaskExecutor实现线程池,示例如下:
a.在config包下创建ThreadPoolTaskExecutorConfig类,用来自定义异步任务线程池

@Configuration
@EnableAsync
public class AsyncThreadPoolConfig{
    @Autowired
    private BeanFactory beanFactory;
    
    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor getScorePoolTaskExecutor(){
         ThreadPoolTaskExecutor taskExecutor = new ThreadPollTaskExecutor();
         //核心线程数
         taskExecutor.setCorePoolSize(2);
         //线程池维护线程的最大数量
         taskExecutor.setMaxPoolSize(10);
         //缓存队列大小
         taskExecutor.setQueueCapacity(10);
         //允许的空闲时间,当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
         taskExecutor.setKeepAliveSeconds(20);
         //异步方法内部线程名称
         taskExecutor.setThreadNamePrefix("trace-thread-");
         return new LazyTraceThreadPoolTaskExecutor(beanFactory,taskExecutor);
    }
}

线程池定义好后,在服务中使用线程池来实现方法的异步调用。在实际使用过程中,可以把线程池中的配置数据改成可配的。
消息组件的链路追踪
Sleuth支持对RPC设置日志跟踪上下文,可以通过接收RPC请求中的跟踪信息,将跟踪信息设置到跟踪上下文中,从而实现对RPC日志数据跟踪的支持。以下拿RabbitMQ来举例,如何实现将外部的traceId设置当前的传播上下文中
(1)生产者侧
生产者发送消息时,需要把当前traceId设置到message中,这样做的目的是将上层的traceId发送到下层,代码示例如下

Message message = new Message(msgTopic, Msgtag, msg.getBytes());
String traceId = MDC.get("X-B3-TraceId");
message.putUserProperties("traceId",traceId);
message.setKey(key);
producer.sendOneway(message);

(2)消费者侧
消费者消费消息时,需要将消息中的traceId设置到当前传播上下文中
设置信息提取器

public class TraceRemoteGetter implements Propagation.RemoteGetter<Map<String,String>> {
    @Override
    public Span.Kind spanKind() {
        return Span.Kind.SERVER;
    }

    @Override
    public String get(Map<String, String> request, String fieldName) {
        return request.get(fieldName);
    }
}

将上层的traceId设置当前处理中

// Tracing tracing 该对象注入进来
    
    //配置从请求中提取跟踪上下文的函数
    TraceContext.Extractor<Map<String,String>> extractor = tracing.propagation()
            .extractor(new TraceRemoteGetter());
    Map<String,String> map = new HashMap<>();
    //traceId是从消费的消息中获取到的上层traceId 
    //这个traceId不能随便定义,需要通过采用系统规则定义,因为设置之后系统会转换成数据,
    //如果不合规则会重新生成
    map.put("X-B3-TraceId",traceId);
    map.put("X-B3-ParentSpanId",traceId);
    map.put("X-B3-Sampled","0");

    //将外部traceId设置到上下文
    TraceContextOrSamplingFlags extracted = extractor.extract(map);
    Span span = tracing.tracer().nextSpan(extracted);
    //当新的TraceContext设置为当前上下文
    CurrentTraceContext.Scope scope =  tracing.currentTraceContext().nextSpan(span.context());
    log.info("重新定义日志跟踪信息");
    //此方法一定要执行 不然会报错
    scope.close();

注意:从上层传过来的traceId的值不能随便定义,需要通过采用系统规则定义,因为设置之后系统会转换为数字,如果不合规则就会重新生成,那么traceId无效
还更多功能待开挖,感兴趣请看:https://docs.spring.io/spring-cloud-sleuth/docs/2.2.8.RELEASE/reference/html/

上一篇 下一篇

猜你喜欢

热点阅读