分布式技术

Spring Cloud Sleuth分布式链路跟踪

2019-09-26  本文已影响0人  DoubleFooker

Sleuth 的使用

Sleuth 是Spring Cloud技术栈官方提供的分布式链路跟踪组件。2.0.0开始使用brave作为跟踪类库。接入后日志信息中会添加[service-name,traceID,SpanId,是否采样]信息

Zipkin基本概念

Trace

对Span信息的集合,表示一次完整的跟踪信息。

Span

跟踪信息单元,包括请求时间、地址、服务、方法、耗时等基本信息

Annotation

跟踪环节信息,即客户端、服务端开始、结束环节

Propagation

分布式服务跟踪的桥梁,通过header传递TraceId,SpanId

   Client Span                                                Server Span
┌──────────────────┐                                       ┌──────────────────┐
│                  │                                       │                  │
│   TraceContext   │           Http Request Headers        │   TraceContext   │
│ ┌──────────────┐ │          ┌───────────────────┐        │ ┌──────────────┐ │
│ │ TraceId      │ │          │ X─B3─TraceId      │        │ │ TraceId      │ │
│ │              │ │          │                   │        │ │              │ │
│ │ ParentSpanId │ │ Extract  │ X─B3─ParentSpanId │ Inject │ │ ParentSpanId │ │
│ │              ├─┼─────────>│                   ├────────┼>│              │ │
│ │ SpanId       │ │          │ X─B3─SpanId       │        │ │ SpanId       │ │
│ │              │ │          │                   │        │ │              │ │
│ │ Sampled      │ │          │ X─B3─Sampled      │        │ │ Sampled      │ │
│ └──────────────┘ │          └───────────────────┘        │ └──────────────┘ │
│                  │                                       │                  │
└──────────────────┘                                       └──────────────────┘

一个完整的调用链路图


image

Sleuth整合zipkin的使用

引入依赖

<dependency> 
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

配置zipkin地址

# 默认地址http://localhost:9411
spring.zipkin.base-url=http://localhost:9411
# 采样率 默认是0.1
spring.sleuth.sampler.probability=1.0

启动zipkin服务器

 java -jar .\zipkin-server-2.12.9-exec.jar 

使用kafka异步收集信息

引入依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.8.RELEASE</version>
</dependency>

配置kafka地址

# 默认地址http://localhost:9092
spring.kafka.bootstrap-servers=localhost:9092
# 使用kafka发送消息
spring.zipkin.sender.type=kafka
# 默认topic为zipkin
spring.zipkin.kafka.topic=zipkin

zipkin服务端启动需要添加参数

java -jar .\zipkin-server-2.12.9-exec.jar --zipkin.collector.kafka.bootstrap-servers=localhost:9092

应用接入sleuth后,每次请求、服务调用都会产生Trace数据。Span信息会被发送到kafka,zipkin服务端通过消费kafka中topic为zipkin的消息收集数据。通过zipkinUI展示。

服务端数据持久化

使用es存储,添加启动参数

java -jar .\zipkin-server-2.12.9-exec.jar --zipkin.collector.kafka.bootstrap-servers=localhost:9092
--zipkin.storage.type=elasticsearch
--zipkin.storage.elasticsearch.hosts=esIP:PORT

其他参数项可通过github-zipkin了解

自定义跟踪信息

通过@NewSpan注解实现服务耗时的跟,在调用方法上加上注解即可。

这是没有annotation信息,需要手动调用customizer.annotate("Client start");添加。或者通过定义SpanCreator实现

@Slf4j
@Component
class MySpanCreator implements NewSpanParser {
    @Override
    public void parse(MethodInvocation pjp, NewSpan newSpan, SpanCustomizer span) {
        String name = StringUtils.isEmpty(newSpan.name()) ?
                pjp.getMethod().getName() : newSpan.name();
        String changedName = SpanNameUtil.toLowerHyphen(name);
        span.annotate("service-start");
        span.name(changedName);
        span.tag("key","val");
    }
}

过滤不需要的跟踪信息

通过配置

spring.sleuth.web.skip-pattern=/test/*

优点

缺点:

ReporterMetrics

监控消息发送

源码

异步手机消息AsyncReporter

 public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
      if (encoder == null) throw new NullPointerException("encoder == null");

      if (encoder.encoding() != sender.encoding()) {
        throw new IllegalArgumentException(String.format(
            "Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
      }

      final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

      if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop.
        final BufferNextMessage<S> consumer =
            BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
            //创建守护线程发送消息
        final Thread flushThread = new Thread("AsyncReporter{" + sender + "}") {
          @Override public void run() {
            try {
              while (!result.closed.get()) {
                result.flush(consumer);
              }
            } catch (RuntimeException | Error e) {
              BoundedAsyncReporter.logger.log(Level.WARNING, "Unexpected error flushing spans", e);
              throw e;
            } finally {
              int count = consumer.count();
              if (count > 0) {
                metrics.incrementSpansDropped(count);
                BoundedAsyncReporter.logger.warning("Dropped " + count + " spans due to AsyncReporter.close()");
              }
              result.close.countDown();
            }
          }
        };
        flushThread.setDaemon(true);
        flushThread.start();
      }
      return result;
    }

可以通过创建ReporterMetrics自定义bean实现先缓存本地,kafka连接失败重试。

上一篇下一篇

猜你喜欢

热点阅读