SkyWalking之Tracing

2023-06-30  本文已影响0人  gregoriusxu

上文我们说到作为一个全链路监控系统,Tracing是必不可少的部分,今天我们从客户端和服务端的源代码角度分析一下SkyWalking如何实现这块的。

Agent 代理原理

在介绍Traceing真正的实现之前,我们有必要知道SkyWalking是如何帮助我们完成agent代理的,从上面的文章我们知道,agent的入口是SkyWalkingAgent,我们之前提到了PluginFinder这个加载类,其实它是用来加载agent代理类的总管。如果需要给不同的框架或者代码增加埋点入口,我们需要对框架的代码或者原理非常了解才行,这样才能最大限度抓取到我们需要的trace信息,举例来说,如果我们需要抓取慢SQL,你使用的是mysql驱动,那么需要在创建preparestatement或者statement的时候最好,SkyWalking为我们准备CreatePreparedStatementInterceptor和CreateStatementInterceptor两个解析类,用来拦截ConnectionImpl 的prepareStatement和createStatement方法。

那么这些解析器什么时候使用呢?那么就涉及到这些拦截类的加载,SkyWalking使用拦截类的加载基类AbstractClassEnhancePluginDefine来加载这些拦截类,这个类对拦截类的加载又分为三类

这个类还有其它方法,define,enhanceInstance,enhanceClass,因为我们最终目的是对实例或者静态方法进行增强,所以这三个方法分别就是做这两件事情的,最要入口方法define,是返回的是net.bytebuddy.dynamic.DynamicType.Builder,从名字我们看出这个是bytebuddy的,如果不了解bytebuddy是如何对类进行增加的可以参考我之前写的文章。

我们再次回到SkyWalkingAgent的PluginFinder插件查找类,SkyWalking基于插件扩展开发框架方便我们定义拦截类,这样设计非常灵活,我们来看一下PluginFinder的初始化过程:

pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());

入口参数是通过new PluginBootstrap().loadPlugins()来加载所有插件,PluginBootstrap使用PluginResourcesResolver加载所有插件目录,PluginResourcesResolver最终调用的是AgentClassLoader从所有插件里面的resource目录找到skywalking-plugin.def的拦截定义类的定义文件,那么AgentClassLoader是从哪些目录加载这些插件的呢?我们知道,AgentClassLoader是SkyWalking自定的ClassLoader加载器,重写的findResource方法,这个方法其实返回就是启动目录的父目录,所以启动目录的所有子目录来找到插件定义文件,其实主要的插件定义都在Plugin目录。

找到所有插件定义类之后PluginFinder现在持有了所有插件定义类,它将增强的准备工作交给了BootstrapInstrumentBoost.inject方法处理,这个方法主要是为了创建bytebuddy的 AgentBuilder,真正做事的方法是prepareJREInstrumentation

    /**
     * Generate dynamic delegate for ByteBuddy
     *
     * @param pluginFinder   gets the whole plugin list.
     * @param classesTypeMap hosts the class binary.
     * @return true if have JRE instrumentation requirement.
     * @throws PluginException when generate failure.
     */
    private static boolean prepareJREInstrumentation(PluginFinder pluginFinder,
        Map<String, byte[]> classesTypeMap) throws PluginException {
        TypePool typePool = TypePool.Default.of(BootstrapInstrumentBoost.class.getClassLoader());
        List<AbstractClassEnhancePluginDefine> bootstrapClassMatchDefines = pluginFinder.getBootstrapClassMatchDefine();
        for (AbstractClassEnhancePluginDefine define : bootstrapClassMatchDefines) {
            if (Objects.nonNull(define.getInstanceMethodsInterceptPoints())) {
                for (InstanceMethodsInterceptPoint point : define.getInstanceMethodsInterceptPoints()) {
                    if (point.isOverrideArgs()) {
                        generateDelegator(
                            classesTypeMap, typePool, INSTANCE_METHOD_WITH_OVERRIDE_ARGS_DELEGATE_TEMPLATE, point
                                .getMethodsInterceptor());
                    } else {
                        generateDelegator(
                            classesTypeMap, typePool, INSTANCE_METHOD_DELEGATE_TEMPLATE, point.getMethodsInterceptor());
                    }
                }
            }

            if (Objects.nonNull(define.getConstructorsInterceptPoints())) {
                for (ConstructorInterceptPoint point : define.getConstructorsInterceptPoints()) {
                    generateDelegator(
                        classesTypeMap, typePool, CONSTRUCTOR_DELEGATE_TEMPLATE, point.getConstructorInterceptor());
                }
            }

            if (Objects.nonNull(define.getStaticMethodsInterceptPoints())) {
                for (StaticMethodsInterceptPoint point : define.getStaticMethodsInterceptPoints()) {
                    if (point.isOverrideArgs()) {
                        generateDelegator(
                            classesTypeMap, typePool, STATIC_METHOD_WITH_OVERRIDE_ARGS_DELEGATE_TEMPLATE, point
                                .getMethodsInterceptor());
                    } else {
                        generateDelegator(
                            classesTypeMap, typePool, STATIC_METHOD_DELEGATE_TEMPLATE, point.getMethodsInterceptor());
                    }
                }
            }
        }
        return bootstrapClassMatchDefines.size() > 0;
    }

上面的代码主要是分三步,主要是我们上面介绍的AbstractClassEnhancePluginDefine的三个方法增强

其实真正的增加是在SkyWalkingAgent的内部类Transformer处理,这个类继承bytebuddy AgentBuilder.Transformer实现,本质是调用我们刚才介绍的AbstractClassEnhancePluginDefine的define类完成实质性的代码增强


@Override
public DynamicType.Builder<?> transform(final DynamicType.Builder<?> builder,final TypeDescription typeDescription,final ClassLoader classLoader,final JavaModule module) {
        LoadedLibraryCollector.registerURLClassLoader(classLoader);
        List<AbstractClassEnhancePluginDefine> pluginDefines = pluginFinder.find(typeDescription);
        if (pluginDefines.size() > 0) {
            DynamicType.Builder<?> newBuilder = builder;
            EnhanceContext context = new EnhanceContext();
            for (AbstractClassEnhancePluginDefine define : pluginDefines) {
                DynamicType.Builder<?> possibleNewBuilder = define.define(
                    typeDescription, newBuilder, classLoader, context);
                    if (possibleNewBuilder != null) {
                        newBuilder = possibleNewBuilder;
                    }
            }
            if (context.isEnhanced()) {
                LOGGER.debug("Finish the prepare stage for {}.", typeDescription.getName());
            }

            return newBuilder;
        }

        LOGGER.debug("Matched class {}, but ignore by finding mechanism.", typeDescription.getTypeName());
        return builder;
}

最后我们来看一下真正增强实例类的方法org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine#enhanceInstance


// ...方法太长,自己看一下
/**
 * Manipulate class source code.<br/>
 *
 * new class need:<br/>
 * 1.Add field, name {@link #CONTEXT_ATTR_NAME}.
 * 2.Add a field accessor for this field.
 *
 * And make sure the source codes manipulation only occurs once.
 *
 */
if (!typeDescription.isAssignableTo(EnhancedInstance.class)) {
    if (!context.isObjectExtended()) {
        newClassBuilder = newClassBuilder.defineField(
           CONTEXT_ATTR_NAME, Object.class, ACC_PRIVATE | ACC_VOLATILE)
                          .implement(EnhancedInstance.class)
                                    .intercept(FieldAccessor.ofField(CONTEXT_ATTR_NAME));
        context.extendObjectCompleted();
    }
}
// 其它逻辑
 newClassBuilder = newClassBuilder.method(junction)
 .intercept(MethodDelegation.withDefaultConfiguration()
 .to(new InstMethodsInter(interceptor, classLoader)));

// ...其它逻辑

方法太长,我们只关注主重要的一些代码,第一块是让目标类实现EnhancedInstance接口,在目标方法里面定义一个名称是CONTEXT_ATTR_NAME即_$EnhancedClassField_ws的字段, 定义getSkyWalkingDynamicField() 和setSkyWalkingDynamicField() 两个方法,分别读写新增的_$EnhancedClassField_ws 字段,这个很重要,是用来承载Tracing信息的字段,下面一行是使用bytebuddy 方法拦截类InstMethodsInter,bytebuddy帮我们调用这个拦截类的intercept方法


/**
     * Intercept the target instance method.
     *
     * @param obj          target class instance.
     * @param allArguments all method arguments
     * @param method       method description.
     * @param zuper        the origin call ref.
     * @return the return value of target instance method.
     * @throws Exception only throw exception because of zuper.call() or unexpected exception in sky-walking ( This is a
     *                   bug, if anything triggers this condition ).
     */
    @RuntimeType
    public Object intercept(@This Object obj, @AllArguments Object[] allArguments, @SuperCall Callable<?> zuper,
        @Origin Method method) throws Throwable {
        EnhancedInstance targetObject = (EnhancedInstance) obj;

        MethodInterceptResult result = new MethodInterceptResult();
        try {
            interceptor.beforeMethod(targetObject, method, allArguments, method.getParameterTypes(), result);
        } catch (Throwable t) {
            LOGGER.error(t, "class[{}] before method[{}] intercept failure", obj.getClass(), method.getName());
        }

        Object ret = null;
        try {
            if (!result.isContinue()) {
                ret = result._ret();
            } else {
                ret = zuper.call();
            }
        } catch (Throwable t) {
            try {
                interceptor.handleMethodException(targetObject, method, allArguments, method.getParameterTypes(), t);
            } catch (Throwable t2) {
                LOGGER.error(t2, "class[{}] handle method[{}] exception failure", obj.getClass(), method.getName());
            }
            throw t;
        } finally {
            try {
                ret = interceptor.afterMethod(targetObject, method, allArguments, method.getParameterTypes(), ret);
            } catch (Throwable t) {
                LOGGER.error(t, "class[{}] after method[{}] intercept failure", obj.getClass(), method.getName());
            }
        }
        return ret;
    }

从代码可以看出,主要是调用SkyWalking定义的拦截类实例 基类是InstanceMethodsAroundInterceptor,比如我们上面提到的CreatePreparedStatementInterceptor和CreateStatementInterceptor

Tracing上报解析

我们还是以上面的慢SQL上报为例进行说明,上面我们说到增加类会继承接口EnhancedInstance,在JDBC执行的过程中,SkyWalking分别对Connection,PreparedStatement或者createStatement方法进行增强,最后对PreparedStatement的executeQuery,executeUpdate executeLargeUpdate增强的org.apache.skywalking.apm.plugin.jdbc.mysql.PreparedStatementExecuteMethodsInterceptor或Statement的executeQuery,executeUpdate,executeLargeUpdate,executeBatchInternal,executeUpdateInternal,executeQuery,executeBatch方法进行增强的org.apache.skywalking.apm.plugin.jdbc.mysql.StatementExecuteMethodsInterceptor,前面对Connection,PreparedStatement增加主要是为了将链接信息,SQL参数信息放到上下文进行传递,最后PreparedStatementExecuteMethodsInterceptor或者StatementExecuteMethodsInterceptor进行上报处理,我们以PreparedStatementExecuteMethodsInterceptor为例来看一下代码


@Override
public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) {
        StatementEnhanceInfos cacheObject = (StatementEnhanceInfos) objInst.getSkyWalkingDynamicField();
        /**
         * For avoid NPE. In this particular case, Execute sql inside the {@link com.mysql.jdbc.ConnectionImpl} constructor,
         * before the interceptor sets the connectionInfo.
         * When invoking prepareCall, cacheObject is null. Because it will determine procedures's parameter types by executing sql in mysql 
         * before the interceptor sets the statementEnhanceInfos.
         * @see JDBCDriverInterceptor#afterMethod(EnhancedInstance, Method, Object[], Class[], Object)
         */
        if (cacheObject != null && cacheObject.getConnectionInfo() != null) {
            ConnectionInfo connectInfo = cacheObject.getConnectionInfo();
            AbstractSpan span = ContextManager.createExitSpan(
                buildOperationName(connectInfo, method.getName(), cacheObject
                    .getStatementName()), connectInfo.getDatabasePeer());
            Tags.DB_TYPE.set(span, "sql");
            Tags.DB_INSTANCE.set(span, connectInfo.getDatabaseName());
            Tags.DB_STATEMENT.set(span, SqlBodyUtil.limitSqlBodySize(cacheObject.getSql()));
            span.setComponent(connectInfo.getComponent());

            if (JDBCPluginConfig.Plugin.JDBC.TRACE_SQL_PARAMETERS) {
                final Object[] parameters = cacheObject.getParameters();
                if (parameters != null && parameters.length > 0) {
                    int maxIndex = cacheObject.getMaxIndex();
                    String parameterString = getParameterString(parameters, maxIndex);
                    SQL_PARAMETERS.set(span, parameterString);
                }
            }

            SpanLayer.asDB(span);
        }
}

@Override
public final Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,Class<?>[] argumentsTypes, Object ret) {
    StatementEnhanceInfos cacheObject = (StatementEnhanceInfos) objInst.getSkyWalkingDynamicField();
    if (cacheObject != null && cacheObject.getConnectionInfo() != null) {
            ContextManager.stopSpan();
    }

    return ret;
}

@Override
public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,Class<?>[] argumentsTypes, Throwable t) {
    StatementEnhanceInfos cacheObject = (StatementEnhanceInfos) objInst.getSkyWalkingDynamicField();
    if (cacheObject != null && cacheObject.getConnectionInfo() != null) {
        ContextManager.activeSpan().log(t);
    }
}

在解释上面的代码之前,首先我们来了解几个概念:

Span

Span 分为 3 类:

它们都继承自AbstractSpan ,其主要的方法有:

TraceSegment

在 SkyWalking 中,TraceSegment 是一个介于 Trace 与 Span 之间的概念,它是一条 Trace 的一段,可以包含多个 Span。在微服务架构中,一个请求基本都会涉及跨进程(以及跨线程)的操作,例如, RPC 调用、通过 MQ 异步执行、HTTP 请求远端资源等,处理一个请求就需要涉及到多个服务的多个线程。TraceSegment 记录了一个请求在一个线程中的执行流程(即 Trace 信息)。将该请求关联的 TraceSegment 串联起来,就能得到该请求对应的完整 Trace。

Context

SkyWalking 中的每个 TraceSegment 都与一个 Context 上下文对象一对一绑定,Context 上下文不仅记录了 TraceSegment 的上下文信息,还提供了管理 TraceSegment 生命周期、创建 Span 以及跨进程(跨线程)传播相关的功能。

AbstractTracerContext 是对上下文概念的抽象,其中定义了 Context 上下文的基本行为:

AbstractTraceContext 有两个实现类IgnoredTracerContext,TracingContext,IgnoredTracerContext 表示该 Trace 将会被丢失,所以其中不会记录任何信息,里面所有方法也都是空实现。这里重点来看 TracingContext,其核心字段如下:

结合上面的解析以及前一篇的介绍,我们知道SkyWalking使用堆栈进行Span管理,EntrySpan为TraceSegment入口,ExitSpan为TraceSegment出口,如果调用链复杂,我们可能会同时用EntrySpan和ExitSpan,但是对于上面的例子,我们只需要创建一个ExitSpan就可以了,所以上面代码不用解析已经不言自明。

那么数据是如何上报的呢?我们关注一下afterMethod方法, ContextManager.stopSpan()这个方法最要是调用org.apache.skywalking.apm.agent.core.context.TracingContext#finish方法


/**
     * Finish this context, and notify all {@link TracingContextListener}s, managed by {@link
     * TracingContext.ListenerManager} and {@link TracingContext.TracingThreadListenerManager}
     */
    private void finish() {
        if (isRunningInAsyncMode) {
            asyncFinishLock.lock();
        }
        try {
            boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
            if (isFinishedInMainThread) {
                /*
                 * Notify after tracing finished in the main thread.
                 */
                TracingThreadListenerManager.notifyFinish(this);
            }

            if (isFinishedInMainThread && (!isRunningInAsyncMode || asyncSpanCounter == 0)) {
                TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
                TracingContext.ListenerManager.notifyFinish(finishedSegment);
                running = false;
            }
        } finally {
            if (isRunningInAsyncMode) {
                asyncFinishLock.unlock();
            }
        }
    }

当 TracingContext 通过 stopSpan() 方法关闭最后一个 Span 时,会调用 finish() 方法关闭相应的 TraceSegment,与此同时,还会调用 TracingContext.ListenerManager.notifyFinish() 方法通知所有监听 TracingContext 关闭事件的监听器 —— TracingContextListener,TraceSegmentServiceClient 是 TracingContextListener 接口的实现之一,其主要功能就是在 TraceSegment 结束时对其进行收集,并发送到后端的 OAP 集群。TraceSegmentServiceClient 底层维护了一个 DataCarrier 对象,其底层 Channels 默认有 5 个 Buffer,每个 Buffer 长度为 300,使用的是 IF_POSSIBLE 阻塞写入策略,底层会启动一个 ConsumerThread 线程。

TraceSegmentServiceClient 作为一个 TracingContextListener 接口的实现,会在 notifyFinish() 方法中,将刚刚结束的 TraceSegment 写入到 DataCarrier 中缓存。同时,TraceSegmentServiceClient 实现了 IConsumer 接口,封装了消费 Channels 中数据的逻辑,在 consume() 方法中会首先将消费到的 TraceSegment 对象序列化,然后通过 gRPC 请求发送到后端 OAP 集群,最后我们看一下TraceSegmentServiceClient的consume() 方法


@Override
public void consume(List<TraceSegment> data) {
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
                Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
            ).collect(new StreamObserver<Commands>() {
                @Override
                public void onNext(Commands commands) {
                    ServiceManager.INSTANCE.findService(CommandService.class)
                                           .receiveCommand(commands);
                }

                @Override
                public void onError(
                    Throwable throwable) {
                    status.finished();
                    if (LOGGER.isErrorEnable()) {
                        LOGGER.error(
                            throwable,
                            "Send UpstreamSegment to collector fail with a grpc internal exception."
                        );
                    }
                    ServiceManager.INSTANCE
                        .findService(GRPCChannelManager.class)
                        .reportError(throwable);
                }

                @Override
                public void onCompleted() {
                    status.finished();
                }
            });

            try {
                for (TraceSegment segment : data) {
                    SegmentObject upstreamSegment = segment.transform();
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
                }
            } catch (Throwable t) {
                LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
            }

            upstreamSegmentStreamObserver.onCompleted();

            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }

        printUplinkStatus();
}

注意,TraceSegmentServiceClient 在批量发送完 UpstreamSegment 数据之后,会通过 GRPCStreamServiceStatus 进行自旋等待,直至该批 UpstreamSegment 全部发送完毕。

下面我们来分析一下TraceSegmentServiceClient在哪里启动的以及consume是如何调用的,还记得上篇文章我们分析SkyWalking的微内核架构吗?我们列出了第一个启动服务类就是TraceSegmentServiceClient,可以看到TraceSegmentServiceClient继承于BootService的微内核服务,这个服务就是用来消费是报数据使用。TracingContext将上报数据缓存到TraceSegmentServiceClient的DataCarrier,同时DataCarrier持有一个ConsumeDriver对象,这个对象相当于一个线程池,线程池里面实际的工作线程是ConsumerThread,这个继承于Thread的线程,用来消费实现了org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer接口的实现类,TraceSegmentServiceClient就实现了这个接口,在构造DataCarrier传了this,ConsumeDriver将DataCarrier传的 Channels 转为ConsumerThread持有的List<DataSource> 类型数组对象dataSources,这样dataSources持有了Channels所持有的QueueBuffer<T>队列,最后将QueueBuffer<T>队列里面的元素drainTo到一个List数组,最终传给IConsumer接口的实现类进行消费,下面是org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerThread#consume的实现。


    private boolean consume(List<T> consumeList) {
        for (DataSource dataSource : dataSources) {
            dataSource.obtain(consumeList);
        }

        if (!consumeList.isEmpty()) {
            try {
                consumer.consume(consumeList);
            } catch (Throwable t) {
                consumer.onError(consumeList, t);
            } finally {
                consumeList.clear();
            }
            return true;
        }
        consumer.nothingToConsume();
        return false;
    }

最后我们分析一下ContextManager,顾名思意,这是一个管理TraceSegment上报数据上下文的类,同样它也是继承自BootService,同样回归前一篇文章,我们列出的第二个服务器就是它,ContextManager里面的属性有两个ThreadLocal数组ThreadLocal<AbstractTracerContext> 类型的CONTEXT,ThreadLocal<RuntimeContext> 的RUNTIME_CONTEXT,CONTEXT具体的类型就是我们上面提到的IgnoredTracerContext,TracingContext,RUNTIME_CONTEXT用来传递trace过程中的中间数据,我们可以发现ContextManager的prepare,boot,onComplete,shutdown都是空的,为什么这么设计?我猜测只是借助于初始化过程做一个ThreadLocal的预热。

TraceSegment是如何填充数据的?我们发现TraceSegment只有archive方法做了数据的添加,最后跟踪到org.apache.skywalking.apm.agent.core.context.TracingContext#stopSpan调用org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan#finish方法将数据装入TraceSegment

    /**
     * Finish the active Span. When it is finished, it will be archived by the given {@link TraceSegment}, which owners
     * it.
     *
     * @param owner of the Span.
     */
    public boolean finish(TraceSegment owner) {
        this.endTime = System.currentTimeMillis();
        owner.archive(this);
        return true;
    }
上一篇 下一篇

猜你喜欢

热点阅读