dubbo 分布式追踪系统我爱编程

基于Dubbo的分布式链路追踪

2018-04-10  本文已影响233人  ZMMWMY

源码cloud分支
假设已经了解了Dubbo的Filter,Spring Cloud Sleuth,ZipKin。
其实openZipKin已经提供了该功能,请切换到dubbo分支
但它是基于最新的spring cloud sleuth。spring cloud sleuth 2.0.0,我也没去了解最新的版本的代码,因为公司版本不是想升就升,所以造了一个轮子,其思想和源码差不多。
另外使用spring cloud sleuth来链路追踪而不使用其他库,比如Brave。那就是它集成了将链路信息记录到LogBack的功能。
基于Brave的Demo 请看源码master分支
本文直接贴代码,有空会进行讲解


@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MyTracingFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyTracingFilter.class);

    protected void addRequestTags(RpcContext context) {
        this.httpTraceKeysInjector.addRequestTags(context.getUrl().getAddress(),
                context.getUrl().getHost(),
                context.getUrl().getPath(),
                context.getMethodName(),
                Collections.emptyMap());
    }

    HttpTraceKeysInjector httpTraceKeysInjector;
    Tracer tracer;
    DubboExtractor dubboExtractor;
    DubboInject injector;
    SpanReporter spanReporter;

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        tracer = ApplicationBeanHolder.getBean(Tracer.class);
        if (tracer == null) {
            return invoker.invoke(invocation);
        } else {
            injectBean();
        }


        RpcContext rpcContext = RpcContext.getContext();
        Span span;
        String service = invoker.getInterface().getSimpleName();
        String method = RpcUtils.getMethodName(invocation);
        String spanName = service + "/" + method;
        if (rpcContext.isConsumerSide()) {

            span = tracer.createSpan(spanName);
            injector.inject(span, new DubboRequestTextMap(RpcContext.getContext()));
            addRequestTags(RpcContext.getContext());
            span.logEvent(Span.CLIENT_SEND);
            Result result;
            try {
                result =  invoker.invoke(invocation);
            } finally {
                closeSpan(span,true);
            }
            return result;
        } else {
            Span parentSpan = dubboExtractor.joinTrace(new DubboRequestTextMap(RpcContext.getContext()));
            if (parentSpan != null) {
                span = parentSpan;
                tracer.continueSpan(span);
                span.logEvent(Span.SERVER_RECV);
            } else {
                String header = RpcContext.getContext().getAttachment(Span.SPAN_FLAGS);
                if (Span.SPAN_SAMPLED.equals(header)) {
                    span = tracer.createSpan(spanName, new AlwaysSampler());
                } else {
                    span = tracer.createSpan(spanName);
                }
                span.logEvent(Span.SERVER_RECV);
            }
            Result result;
            try {
                result =  invoker.invoke(invocation);
            } finally {
                recordParentSpan(span);
                closeSpan(span,false);
            }
            return result;
        }

    }

    private void recordParentSpan(Span parent) {
        if (parent == null) {
            return;
        }
        if (parent.isRemote()) {
            tracer.getCurrentSpan().logEvent(Span.SERVER_SEND);
            parent.stop();
            this.spanReporter.report(parent);
        }
    }

    private void injectBean() {
        injector = ApplicationBeanHolder.getBean(DubboInject.class);
        dubboExtractor = ApplicationBeanHolder.getBean(DubboExtractor.class);
        httpTraceKeysInjector = ApplicationBeanHolder.getBean(HttpTraceKeysInjector.class);
        spanReporter = ApplicationBeanHolder.getBean(SpanReporter.class);
    }

    private void closeSpan(Span span, Boolean type) {
        if (type) {
            tracer.getCurrentSpan().logEvent(Span.CLIENT_RECV);
        }
        if (span != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Closing Dubbo span " + span);
            }
            tracer.close(span);
        }
    }
}
public class DubboInject implements SpanInjector<SpanTextMap> {

    private static final DubboSpanMapper SPAN_CARRIER_MAPPER = new DubboSpanMapper();

    @Override
    public void inject(Span span, SpanTextMap map) {
        Map<String, String> carrier = SPAN_CARRIER_MAPPER.convert(map);
        setHeader(map, carrier, Span.TRACE_ID_NAME, span.traceIdString());
        setIdHeader(map, carrier, Span.SPAN_ID_NAME, span.getSpanId());
        setHeader(map, carrier, Span.SAMPLED_NAME, span.isExportable() ? Span.SPAN_SAMPLED : Span.SPAN_NOT_SAMPLED);
        setHeader(map, carrier, Span.SPAN_NAME_NAME, span.getName());
        setIdHeader(map, carrier, Span.PARENT_ID_NAME, getParentId(span));
        setHeader(map, carrier, Span.PROCESS_ID_NAME, span.getProcessId());
        for (Map.Entry<String, String> entry : span.baggageItems()) {
            map.put(prefixedKey(entry.getKey()), entry.getValue());
        }
    }

    private String prefixedKey(String key) {
        if (key.startsWith(Span.SPAN_BAGGAGE_HEADER_PREFIX
                + DubboSpanMapper.HEADER_DELIMITER)) {
            return key;
        }
        return Span.SPAN_BAGGAGE_HEADER_PREFIX + DubboSpanMapper.HEADER_DELIMITER
                + key;
    }

    private Long getParentId(Span span) {
        return !span.getParents().isEmpty() ? span.getParents().get(0) : null;
    }

    private void setIdHeader(SpanTextMap map, Map<String, String> carrier, String name, Long value) {
        if (value != null) {
            setHeader(map, carrier, name, Span.idToHex(value));
        }
    }

    private void setHeader(SpanTextMap map, Map<String, String> carrier, String name, String value) {
        if (StringUtils.hasText(value) && !carrier.containsKey(name)) {
            map.put(name, value);
        }
    }
}
public class DubboExtractor implements SpanExtractor<SpanTextMap> {

    private static final org.apache.commons.logging.Log log = LogFactory.getLog(
            MethodHandles.lookup().lookupClass());

    private static final String HTTP_COMPONENT = "http";

    private static final DubboSpanMapper SPAN_CARRIER_MAPPER = new DubboSpanMapper();

    private final Pattern skipPattern;
    private final Random random;

    public DubboExtractor(Pattern skipPattern) {
        this.skipPattern = skipPattern;
        this.random = new Random();
    }

    @Override
    public Span joinTrace(SpanTextMap textMap) {
        Map<String, String> carrier = SPAN_CARRIER_MAPPER.convert(textMap);
        boolean debug = Span.SPAN_SAMPLED.equals(carrier.get(Span.SPAN_FLAGS));
        boolean idToBeGenerated = debug && onlySpanIdIsPresent(carrier);

        if (!idToBeGenerated && traceIdIsMissing(carrier)) {
            return null;
        }
        try {
            return buildParentSpan(carrier, idToBeGenerated);
        } catch (Exception e) {
            log.error("Exception occurred while trying to extract span from carrier", e);
            return null;
        }
    }

    private boolean onlySpanIdIsPresent(Map<String, String> carrier) {
        return traceIdIsMissing(carrier) && spanIdIsPresent(carrier);
    }

    private boolean traceIdIsMissing(Map<String, String> carrier) {
        return carrier.get(Span.TRACE_ID_NAME) == null;
    }

    private boolean spanIdIsPresent(Map<String, String> carrier) {
        return carrier.get(Span.SPAN_ID_NAME) != null;
    }

    private String generateId() {
        return Span.idToHex(this.random.nextLong());
    }

    private long spanId(String spanId, String traceId) {
        if (spanId == null) {
            if (log.isDebugEnabled()) {
                log.debug("Request is missing a span id but it has a trace id. We'll assume that this is "
                        + "a root span with span id equal to the lower 64-bits of the trace id");
            }
            return Span.hexToId(traceId);
        } else {
            return Span.hexToId(spanId);
        }
    }

    private Span buildParentSpan(Map<String, String> carrier, boolean idToBeGenerated) {
        String traceId = carrier.get(Span.TRACE_ID_NAME);
        if (traceId == null) {
            traceId = generateId();
        }
        Span.SpanBuilder span = Span.builder()
                .traceIdHigh(traceId.length() == 32 ? Span.hexToId(traceId, 0) : 0)
                .traceId(Span.hexToId(traceId))
                .spanId(spanId(carrier.get(Span.SPAN_ID_NAME), traceId));
        String parentName = carrier.get(Span.SPAN_NAME_NAME);
        if (StringUtils.hasText(parentName)) {
            span.name(parentName);
        } else {
            span.name(HTTP_COMPONENT + ":/parent"
                    + carrier.get(DubboSpanMapper.URI_HEADER));
        }
        String processId = carrier.get(Span.PROCESS_ID_NAME);
        if (StringUtils.hasText(processId)) {
            span.processId(processId);
        }
        String parentId = carrier.get(Span.PARENT_ID_NAME);
        if (parentId != null) {
            span.parent(Span.hexToId(parentId));
        }
        span.remote(true);

//        boolean skip = this.skipPattern
//                .matcher(carrier.get(DubboSpanMapper.URI_HEADER)).matches()
//                || Span.SPAN_NOT_SAMPLED.equals(carrier.get(Span.SAMPLED_NAME));
        boolean skip = false;
        // trace, span id were retrieved from the headers and span is sampled
        span.exportable(!(skip || idToBeGenerated));
        boolean debug = Span.SPAN_SAMPLED.equals(carrier.get(Span.SPAN_FLAGS));
        if (debug) {
            span.exportable(true);
        } else if (skip) {
            span.exportable(false);
        }
        for (Map.Entry<String, String> entry : carrier.entrySet()) {
            if (entry.getKey().toLowerCase()
                    .startsWith(DubboSpanMapper.BAGGAGE_PREFIX)) {
                span.baggage(unprefixedKey(entry.getKey()), entry.getValue());
            }
        }
        return span.build();
    }

    private String unprefixedKey(String key) {
        return key.substring(key.indexOf(DubboSpanMapper.HEADER_DELIMITER) + 1)
                .toLowerCase();
    }
}

public class DubboRequestTextMap implements SpanTextMap {

    private final RpcContext delegate;

    public DubboRequestTextMap(RpcContext delegate) {
        this.delegate = delegate;
    }

    @Override
    public Iterator<Map.Entry<String, String>> iterator() {
        return this.delegate.getAttachments().entrySet().iterator();
    }

    @Override
    public void put(String key, String value) {
        if (!StringUtils.hasText(value)) {
            return;
        }
        this.delegate.getAttachments().put(key, value);
    }
}
public class DubboSpanMapper {

    static final String HEADER_DELIMITER = "-";
    static final String BAGGAGE_PREFIX = Span.SPAN_BAGGAGE_HEADER_PREFIX
            + HEADER_DELIMITER;
    static final String URI_HEADER = "X-Span-Uri";

    private static Comparator<String> IGNORE_CASE_COMPARATOR = new Comparator<String>() {
        @Override
        public int compare(String o1, String o2) {
            return o1.toLowerCase().compareTo(o2.toLowerCase());
        }
    };

    /**
     * Acceptable span fields
     */
    private static final Set<String> SPAN_FIELDS;

    static {
        TreeSet<String> fields = new TreeSet<>(IGNORE_CASE_COMPARATOR);
        Collections.addAll(fields, Span.SPAN_FLAGS, Span.TRACE_ID_NAME, Span.SPAN_ID_NAME,
                Span.PROCESS_ID_NAME, Span.SPAN_NAME_NAME, Span.PARENT_ID_NAME,
                Span.SAMPLED_NAME, URI_HEADER);
        SPAN_FIELDS = Collections.unmodifiableSet(fields);
    }

    /**
     * Create new Map of carrier values
     */
    Map<String, String> convert(SpanTextMap textMap) {
        Map<String, String> carrier = new TreeMap<>(IGNORE_CASE_COMPARATOR);
        for (Map.Entry<String, String> entry : textMap) {
            if (isAcceptable(entry.getKey())) {
                carrier.put(entry.getKey(), entry.getValue());
            }
        }
        return Collections.unmodifiableMap(carrier);
    }

    private boolean isAcceptable(String key) {
        return SPAN_FIELDS.contains(key) || key.startsWith(BAGGAGE_PREFIX);
    }
}
上一篇下一篇

猜你喜欢

热点阅读