基于Dubbo的分布式链路追踪
2018-04-10 本文已影响233人
ZMMWMY
源码cloud分支
假设已经了解了Dubbo的Filter,Spring Cloud Sleuth,ZipKin。
其实openZipKin已经提供了该功能,请切换到dubbo分支
但它是基于最新的spring cloud sleuth。spring cloud sleuth 2.0.0,我也没去了解最新的版本的代码,因为公司版本不是想升就升,所以造了一个轮子,其思想和源码差不多。
另外使用spring cloud sleuth来链路追踪而不使用其他库,比如Brave。那就是它集成了将链路信息记录到LogBack的功能。
基于Brave的Demo 请看源码master分支
本文直接贴代码,有空会进行讲解
- 定义Filter
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MyTracingFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(MyTracingFilter.class);
protected void addRequestTags(RpcContext context) {
this.httpTraceKeysInjector.addRequestTags(context.getUrl().getAddress(),
context.getUrl().getHost(),
context.getUrl().getPath(),
context.getMethodName(),
Collections.emptyMap());
}
HttpTraceKeysInjector httpTraceKeysInjector;
Tracer tracer;
DubboExtractor dubboExtractor;
DubboInject injector;
SpanReporter spanReporter;
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
tracer = ApplicationBeanHolder.getBean(Tracer.class);
if (tracer == null) {
return invoker.invoke(invocation);
} else {
injectBean();
}
RpcContext rpcContext = RpcContext.getContext();
Span span;
String service = invoker.getInterface().getSimpleName();
String method = RpcUtils.getMethodName(invocation);
String spanName = service + "/" + method;
if (rpcContext.isConsumerSide()) {
span = tracer.createSpan(spanName);
injector.inject(span, new DubboRequestTextMap(RpcContext.getContext()));
addRequestTags(RpcContext.getContext());
span.logEvent(Span.CLIENT_SEND);
Result result;
try {
result = invoker.invoke(invocation);
} finally {
closeSpan(span,true);
}
return result;
} else {
Span parentSpan = dubboExtractor.joinTrace(new DubboRequestTextMap(RpcContext.getContext()));
if (parentSpan != null) {
span = parentSpan;
tracer.continueSpan(span);
span.logEvent(Span.SERVER_RECV);
} else {
String header = RpcContext.getContext().getAttachment(Span.SPAN_FLAGS);
if (Span.SPAN_SAMPLED.equals(header)) {
span = tracer.createSpan(spanName, new AlwaysSampler());
} else {
span = tracer.createSpan(spanName);
}
span.logEvent(Span.SERVER_RECV);
}
Result result;
try {
result = invoker.invoke(invocation);
} finally {
recordParentSpan(span);
closeSpan(span,false);
}
return result;
}
}
private void recordParentSpan(Span parent) {
if (parent == null) {
return;
}
if (parent.isRemote()) {
tracer.getCurrentSpan().logEvent(Span.SERVER_SEND);
parent.stop();
this.spanReporter.report(parent);
}
}
private void injectBean() {
injector = ApplicationBeanHolder.getBean(DubboInject.class);
dubboExtractor = ApplicationBeanHolder.getBean(DubboExtractor.class);
httpTraceKeysInjector = ApplicationBeanHolder.getBean(HttpTraceKeysInjector.class);
spanReporter = ApplicationBeanHolder.getBean(SpanReporter.class);
}
private void closeSpan(Span span, Boolean type) {
if (type) {
tracer.getCurrentSpan().logEvent(Span.CLIENT_RECV);
}
if (span != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Closing Dubbo span " + span);
}
tracer.close(span);
}
}
}
- DubboInject
public class DubboInject implements SpanInjector<SpanTextMap> {
private static final DubboSpanMapper SPAN_CARRIER_MAPPER = new DubboSpanMapper();
@Override
public void inject(Span span, SpanTextMap map) {
Map<String, String> carrier = SPAN_CARRIER_MAPPER.convert(map);
setHeader(map, carrier, Span.TRACE_ID_NAME, span.traceIdString());
setIdHeader(map, carrier, Span.SPAN_ID_NAME, span.getSpanId());
setHeader(map, carrier, Span.SAMPLED_NAME, span.isExportable() ? Span.SPAN_SAMPLED : Span.SPAN_NOT_SAMPLED);
setHeader(map, carrier, Span.SPAN_NAME_NAME, span.getName());
setIdHeader(map, carrier, Span.PARENT_ID_NAME, getParentId(span));
setHeader(map, carrier, Span.PROCESS_ID_NAME, span.getProcessId());
for (Map.Entry<String, String> entry : span.baggageItems()) {
map.put(prefixedKey(entry.getKey()), entry.getValue());
}
}
private String prefixedKey(String key) {
if (key.startsWith(Span.SPAN_BAGGAGE_HEADER_PREFIX
+ DubboSpanMapper.HEADER_DELIMITER)) {
return key;
}
return Span.SPAN_BAGGAGE_HEADER_PREFIX + DubboSpanMapper.HEADER_DELIMITER
+ key;
}
private Long getParentId(Span span) {
return !span.getParents().isEmpty() ? span.getParents().get(0) : null;
}
private void setIdHeader(SpanTextMap map, Map<String, String> carrier, String name, Long value) {
if (value != null) {
setHeader(map, carrier, name, Span.idToHex(value));
}
}
private void setHeader(SpanTextMap map, Map<String, String> carrier, String name, String value) {
if (StringUtils.hasText(value) && !carrier.containsKey(name)) {
map.put(name, value);
}
}
}
- DubboExtractor
public class DubboExtractor implements SpanExtractor<SpanTextMap> {
private static final org.apache.commons.logging.Log log = LogFactory.getLog(
MethodHandles.lookup().lookupClass());
private static final String HTTP_COMPONENT = "http";
private static final DubboSpanMapper SPAN_CARRIER_MAPPER = new DubboSpanMapper();
private final Pattern skipPattern;
private final Random random;
public DubboExtractor(Pattern skipPattern) {
this.skipPattern = skipPattern;
this.random = new Random();
}
@Override
public Span joinTrace(SpanTextMap textMap) {
Map<String, String> carrier = SPAN_CARRIER_MAPPER.convert(textMap);
boolean debug = Span.SPAN_SAMPLED.equals(carrier.get(Span.SPAN_FLAGS));
boolean idToBeGenerated = debug && onlySpanIdIsPresent(carrier);
if (!idToBeGenerated && traceIdIsMissing(carrier)) {
return null;
}
try {
return buildParentSpan(carrier, idToBeGenerated);
} catch (Exception e) {
log.error("Exception occurred while trying to extract span from carrier", e);
return null;
}
}
private boolean onlySpanIdIsPresent(Map<String, String> carrier) {
return traceIdIsMissing(carrier) && spanIdIsPresent(carrier);
}
private boolean traceIdIsMissing(Map<String, String> carrier) {
return carrier.get(Span.TRACE_ID_NAME) == null;
}
private boolean spanIdIsPresent(Map<String, String> carrier) {
return carrier.get(Span.SPAN_ID_NAME) != null;
}
private String generateId() {
return Span.idToHex(this.random.nextLong());
}
private long spanId(String spanId, String traceId) {
if (spanId == null) {
if (log.isDebugEnabled()) {
log.debug("Request is missing a span id but it has a trace id. We'll assume that this is "
+ "a root span with span id equal to the lower 64-bits of the trace id");
}
return Span.hexToId(traceId);
} else {
return Span.hexToId(spanId);
}
}
private Span buildParentSpan(Map<String, String> carrier, boolean idToBeGenerated) {
String traceId = carrier.get(Span.TRACE_ID_NAME);
if (traceId == null) {
traceId = generateId();
}
Span.SpanBuilder span = Span.builder()
.traceIdHigh(traceId.length() == 32 ? Span.hexToId(traceId, 0) : 0)
.traceId(Span.hexToId(traceId))
.spanId(spanId(carrier.get(Span.SPAN_ID_NAME), traceId));
String parentName = carrier.get(Span.SPAN_NAME_NAME);
if (StringUtils.hasText(parentName)) {
span.name(parentName);
} else {
span.name(HTTP_COMPONENT + ":/parent"
+ carrier.get(DubboSpanMapper.URI_HEADER));
}
String processId = carrier.get(Span.PROCESS_ID_NAME);
if (StringUtils.hasText(processId)) {
span.processId(processId);
}
String parentId = carrier.get(Span.PARENT_ID_NAME);
if (parentId != null) {
span.parent(Span.hexToId(parentId));
}
span.remote(true);
// boolean skip = this.skipPattern
// .matcher(carrier.get(DubboSpanMapper.URI_HEADER)).matches()
// || Span.SPAN_NOT_SAMPLED.equals(carrier.get(Span.SAMPLED_NAME));
boolean skip = false;
// trace, span id were retrieved from the headers and span is sampled
span.exportable(!(skip || idToBeGenerated));
boolean debug = Span.SPAN_SAMPLED.equals(carrier.get(Span.SPAN_FLAGS));
if (debug) {
span.exportable(true);
} else if (skip) {
span.exportable(false);
}
for (Map.Entry<String, String> entry : carrier.entrySet()) {
if (entry.getKey().toLowerCase()
.startsWith(DubboSpanMapper.BAGGAGE_PREFIX)) {
span.baggage(unprefixedKey(entry.getKey()), entry.getValue());
}
}
return span.build();
}
private String unprefixedKey(String key) {
return key.substring(key.indexOf(DubboSpanMapper.HEADER_DELIMITER) + 1)
.toLowerCase();
}
}
- DubboRequestTextMap
public class DubboRequestTextMap implements SpanTextMap {
private final RpcContext delegate;
public DubboRequestTextMap(RpcContext delegate) {
this.delegate = delegate;
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
return this.delegate.getAttachments().entrySet().iterator();
}
@Override
public void put(String key, String value) {
if (!StringUtils.hasText(value)) {
return;
}
this.delegate.getAttachments().put(key, value);
}
}
- DubboSpanMapper
public class DubboSpanMapper {
static final String HEADER_DELIMITER = "-";
static final String BAGGAGE_PREFIX = Span.SPAN_BAGGAGE_HEADER_PREFIX
+ HEADER_DELIMITER;
static final String URI_HEADER = "X-Span-Uri";
private static Comparator<String> IGNORE_CASE_COMPARATOR = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o1.toLowerCase().compareTo(o2.toLowerCase());
}
};
/**
* Acceptable span fields
*/
private static final Set<String> SPAN_FIELDS;
static {
TreeSet<String> fields = new TreeSet<>(IGNORE_CASE_COMPARATOR);
Collections.addAll(fields, Span.SPAN_FLAGS, Span.TRACE_ID_NAME, Span.SPAN_ID_NAME,
Span.PROCESS_ID_NAME, Span.SPAN_NAME_NAME, Span.PARENT_ID_NAME,
Span.SAMPLED_NAME, URI_HEADER);
SPAN_FIELDS = Collections.unmodifiableSet(fields);
}
/**
* Create new Map of carrier values
*/
Map<String, String> convert(SpanTextMap textMap) {
Map<String, String> carrier = new TreeMap<>(IGNORE_CASE_COMPARATOR);
for (Map.Entry<String, String> entry : textMap) {
if (isAcceptable(entry.getKey())) {
carrier.put(entry.getKey(), entry.getValue());
}
}
return Collections.unmodifiableMap(carrier);
}
private boolean isAcceptable(String key) {
return SPAN_FIELDS.contains(key) || key.startsWith(BAGGAGE_PREFIX);
}
}