Spring Cloud Sleuth分布式链路跟踪
2019-09-26 本文已影响0人
DoubleFooker
Sleuth 的使用
Sleuth 是Spring Cloud技术栈官方提供的分布式链路跟踪组件。2.0.0开始使用brave作为跟踪类库。接入后日志信息中会添加[service-name,traceID,SpanId,是否采样]
信息
Zipkin基本概念
Trace
对Span信息的集合,表示一次完整的跟踪信息。
Span
跟踪信息单元,包括请求时间、地址、服务、方法、耗时等基本信息
Annotation
跟踪环节信息,即客户端、服务端开始、结束环节
Propagation
分布式服务跟踪的桥梁,通过header传递TraceId,SpanId
Client Span Server Span
┌──────────────────┐ ┌──────────────────┐
│ │ │ │
│ TraceContext │ Http Request Headers │ TraceContext │
│ ┌──────────────┐ │ ┌───────────────────┐ │ ┌──────────────┐ │
│ │ TraceId │ │ │ X─B3─TraceId │ │ │ TraceId │ │
│ │ │ │ │ │ │ │ │ │
│ │ ParentSpanId │ │ Extract │ X─B3─ParentSpanId │ Inject │ │ ParentSpanId │ │
│ │ ├─┼─────────>│ ├────────┼>│ │ │
│ │ SpanId │ │ │ X─B3─SpanId │ │ │ SpanId │ │
│ │ │ │ │ │ │ │ │ │
│ │ Sampled │ │ │ X─B3─Sampled │ │ │ Sampled │ │
│ └──────────────┘ │ └───────────────────┘ │ └──────────────┘ │
│ │ │ │
└──────────────────┘ └──────────────────┘
一个完整的调用链路图
image
Sleuth整合zipkin的使用
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
配置zipkin地址
# 默认地址http://localhost:9411
spring.zipkin.base-url=http://localhost:9411
# 采样率 默认是0.1
spring.sleuth.sampler.probability=1.0
启动zipkin服务器
java -jar .\zipkin-server-2.12.9-exec.jar
使用kafka异步收集信息
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
配置kafka地址
# 默认地址http://localhost:9092
spring.kafka.bootstrap-servers=localhost:9092
# 使用kafka发送消息
spring.zipkin.sender.type=kafka
# 默认topic为zipkin
spring.zipkin.kafka.topic=zipkin
zipkin服务端启动需要添加参数
java -jar .\zipkin-server-2.12.9-exec.jar --zipkin.collector.kafka.bootstrap-servers=localhost:9092
应用接入sleuth后,每次请求、服务调用都会产生Trace数据。Span信息会被发送到kafka,zipkin服务端通过消费kafka中topic为zipkin的消息收集数据。通过zipkinUI展示。
服务端数据持久化
使用es存储,添加启动参数
java -jar .\zipkin-server-2.12.9-exec.jar --zipkin.collector.kafka.bootstrap-servers=localhost:9092
--zipkin.storage.type=elasticsearch
--zipkin.storage.elasticsearch.hosts=esIP:PORT
其他参数项可通过github-zipkin了解
自定义跟踪信息
通过@NewSpan
注解实现服务耗时的跟,在调用方法上加上注解即可。
这是没有annotation信息,需要手动调用customizer.annotate("Client start");添加。或者通过定义SpanCreator实现
@Slf4j
@Component
class MySpanCreator implements NewSpanParser {
@Override
public void parse(MethodInvocation pjp, NewSpan newSpan, SpanCustomizer span) {
String name = StringUtils.isEmpty(newSpan.name()) ?
pjp.getMethod().getName() : newSpan.name();
String changedName = SpanNameUtil.toLowerHyphen(name);
span.annotate("service-start");
span.name(changedName);
span.tag("key","val");
}
}
过滤不需要的跟踪信息
通过配置
spring.sleuth.web.skip-pattern=/test/*
优点
- SpringCloud体系组件,整合方便
- 调用信息清晰,能快速定位链路环节信息
缺点:
- 使用MQ收集信息,需要引入MQ服务,增加运维压力、复杂度
- 跟踪信息较少,需要跟踪细点的信息时需要代码做处理,不友好
ReporterMetrics
监控消息发送
源码
异步手机消息AsyncReporter
public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
if (encoder == null) throw new NullPointerException("encoder == null");
if (encoder.encoding() != sender.encoding()) {
throw new IllegalArgumentException(String.format(
"Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
}
final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);
if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop.
final BufferNextMessage<S> consumer =
BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
//创建守护线程发送消息
final Thread flushThread = new Thread("AsyncReporter{" + sender + "}") {
@Override public void run() {
try {
while (!result.closed.get()) {
result.flush(consumer);
}
} catch (RuntimeException | Error e) {
BoundedAsyncReporter.logger.log(Level.WARNING, "Unexpected error flushing spans", e);
throw e;
} finally {
int count = consumer.count();
if (count > 0) {
metrics.incrementSpansDropped(count);
BoundedAsyncReporter.logger.warning("Dropped " + count + " spans due to AsyncReporter.close()");
}
result.close.countDown();
}
}
};
flushThread.setDaemon(true);
flushThread.start();
}
return result;
}
可以通过创建ReporterMetrics自定义bean实现先缓存本地,kafka连接失败重试。