pinpoint采样原理分析

2019-03-01  本文已影响0人  相远相连

使用pinpoint进行全链路监控时,支持对请求的采样,某条请求是否被采样,取决于整个链路开始的机器。该机器使用特定的采样算法。采样的标志会一直在链路中透传。比如在http里面,会在header里面增加Pinpoint-Sampled字段,使用不同的值表示是否采样。

s0:此条请求不采样
s1:此条请求采样

下面分为接收端和发送端,分别看下采样的处理。另外在记录span或者spanEvent时,需要注意对是否采样的判断。

接收端采样处理

接收端接收到请求之后,会去查看请求里面是否有Pinpoint-Sampled字段和Pinpoint-TraceID等字段。分为下面几种情况:

  1. Pinpoint-Sampled字段,并且值为s0,表示此条请求不采样。
  2. 没有Pinpoint-Sampled字段,但是有Pinpoint-TraceID等字段,表示此条请求被采样。
  3. 没有Pinpoint-Sampled字段,也没有Pinpoint-TraceID等字段,认为接收该请求的机器,是整条链路的第一个机器,或者链路信息在前面有丢失。

第一种情况,处理比较简单,调用下面的方法,创建DisableTrace,表示此Trace不被采样,并绑定到线程上下文中。

final Trace trace = this.traceContext.disableSampling();
  traceFactory.disableSampling();
    final Trace trace = this.baseTraceFactory.disableSampling();

注:使用缩进表示嵌套关系

第二种情况,调用this.baseTraceFactory.continueTraceObject里面创建DefaultTrace或者AsyncTrace,指定sampling字段为true,表示采样。

continueTrace(request, traceHeader)
  this.traceContext.continueTraceObject(traceId);
    this.baseTraceFactory.continueTraceObject(traceId);

第三种情况,在调用this.baseTraceFactory.newTraceObject()时,使用配置的sampler,进行采样判断:如果采样则创建DefaultTrace对象,并且sampling字段为true,如果不采样则创建DisableTrace对象。

final Trace trace = this.baseTraceFactory.newTraceObject(); 

public class DefaultBaseTraceFactory implements BaseTraceFactory {
    ...
    @Override
    public Trace newTraceObject() {
        // TODO need to modify how to inject a datasender
        final boolean sampling = sampler.isSampling();
        if (sampling) {
            final TraceRoot traceRoot = traceRootFactory.newTraceRoot();
            final Span span = spanFactory.newSpan(traceRoot);

            final Storage storage = storageFactory.createStorage(traceRoot);
            final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

            final TraceId traceId = traceRoot.getTraceId();
            final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), sampling);
            final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot);

            final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
            final DefaultTrace trace = new DefaultTrace(span, callStack, storage, sampling, spanRecorder, wrappedSpanEventRecorder, handle);

            return trace;
        } else {
            return newDisableTrace();
        }
    }
    ...
}

下面重点看下SamplingRateSampler类的代码,该类继承了Sampler接口,代码比较简单:使用累加的计数,当累加出来的数为配置的samplingRate的倍数时,表示该请求被采样。

public class SamplingRateSampler implements Sampler {

    private final AtomicInteger counter = new AtomicInteger(0);
    private final int samplingRate;

    public SamplingRateSampler(int samplingRate) {
        if (samplingRate <= 0) {
            throw new IllegalArgumentException("Invalid samplingRate " + samplingRate);
        }
        this.samplingRate = samplingRate;
    }

    @Override
    public boolean isSampling() {
        int samplingCount = MathUtils.fastAbs(counter.getAndIncrement());
        int isSampling = samplingCount % samplingRate;
        return isSampling == 0;
    }
}

发送端采样处理

请求发送到下游的机器之前,会从当前上下文里面获取Trace对象,并判断是否sampling。以httpclient4 plugin的 拦截器 HttpRequestExecutorExecuteMethodInterceptor举例:

public class HttpRequestExecutorExecuteMethodInterceptor implements AroundInterceptor {
    public void before(Object target, Object[] args) {
         ....
        final Trace trace = traceContext.currentRawTraceObject();
        if (trace == null) {
            return;
        }

        final HttpRequest httpRequest = getHttpRequest(args);
        final NameIntValuePair<String> host = getHost();
        final boolean sampling = trace.canSampled();
        if (!sampling) {
            if (httpRequest != null) {
                this.requestTraceWriter.write(httpRequest);
            }
            return;
        }

        final SpanEventRecorder recorder = trace.traceBlockBegin();
        TraceId nextId = trace.getTraceId().getNextTraceId();
        recorder.recordNextSpanId(nextId.getSpanId());
        recorder.recordServiceType(HttpClient4Constants.HTTP_CLIENT_4);
        if (httpRequest != null) {
            final String hostString = getHostString(host.getName(), host.getValue());
            this.requestTraceWriter.write(httpRequest, nextId, hostString);
        }

        InterceptorScopeInvocation invocation = interceptorScope.getCurrentInvocation();
        if (invocation != null) {
            invocation.getOrCreateAttachment(HttpCallContextFactory.HTTPCALL_CONTEXT_FACTORY);
        }
    }
  ...
}


public class DefaultRequestTraceWriter<T> implements RequestTraceWriter<T> {
    ...
    @Override
    public void write(T header, final TraceId traceId, final String host) {
        Assert.requireNonNull(traceId, "traceId must not be null");

        if (isDebug) {
            logger.debug("Set request header. traceId={}, applicationName={}, serverTypeCode={}, applicationNamespace={}", traceId, applicationName, serverTypeCode, applicationNamespace);
        }
        clientHeaderAdaptor.setHeader(header, Header.HTTP_TRACE_ID.toString(), traceId.getTransactionId());
        clientHeaderAdaptor.setHeader(header, Header.HTTP_SPAN_ID.toString(), String.valueOf(traceId.getSpanId()));
        clientHeaderAdaptor.setHeader(header, Header.HTTP_PARENT_SPAN_ID.toString(), String.valueOf(traceId.getParentSpanId()));
        clientHeaderAdaptor.setHeader(header, Header.HTTP_FLAGS.toString(), String.valueOf(traceId.getFlags()));
        clientHeaderAdaptor.setHeader(header, Header.HTTP_PARENT_APPLICATION_NAME.toString(), applicationName);
        clientHeaderAdaptor.setHeader(header, Header.HTTP_PARENT_APPLICATION_TYPE.toString(), Short.toString(serverTypeCode));

        if (applicationNamespace != null) {
            clientHeaderAdaptor.setHeader(header, Header.HTTP_PARENT_APPLICATION_NAMESPACE.toString(), applicationNamespace);
        }

        if (host != null) {
            clientHeaderAdaptor.setHeader(header, Header.HTTP_HOST.toString(), host);
        }
    }
   ...
}

如果当前的Tracesampling = false 直接在http头部写入Pinpoint-Sampled=s0, 其他信息都不传递。如果为true,则不设置Pinpoint-Sampled字段,只设置其他trace相关字段,比如Header.HTTP_TRACE_ID等。

对于采样的判断:

如果Tracesampling=false,则该Trace不支持记录span以及spanEvent,所以在需要使用span以及spanEvent的地方,需要去判断当前Trace是否被采样, 比如:

        final Trace trace = createTrace(request);
        if (trace == null) {
            return;
        }

        if (!trace.canSampled()) {
            return;
        }

        ....
        final Trace trace = this.requestTraceReader.read(request);
        if (trace.canSampled()) {
            final SpanRecorder recorder = trace.getSpanRecorder();
            // record root span
            recorder.recordServiceType(this.serviceType);
            recorder.recordApi(SERVLET_SYNC_METHOD_DESCRIPTOR);
            this.serverRequestRecorder.record(recorder, request);
            // record proxy HTTP header.
            this.proxyHttpHeaderRecorder.record(recorder, request);
        }
        return trace;

本文从发送端和接收端两个角度分别简单介绍了pinpoint的采样原理。

上一篇下一篇

猜你喜欢

热点阅读