分布式链路追踪skywalking(一)-agent客户端

2022-02-14  本文已影响0人  后来丶_a24d

思维导图

思维导图.png

分布式链路追踪skywalking系列


前置知识

背景

Java agent

-javaagent:***(增强功能的jar包).jar -jar ***(待增强的jar包).jar
场景
原理
实战
  1. 引用bytebuddy(动态生成类,skywalking就是用这个框架实现)
 <dependencies>
        <!-- https://mvnrepository.com/artifact/net.bytebuddy/byte-buddy -->
        <dependency>
            <groupId>net.bytebuddy</groupId>
            <artifactId>byte-buddy</artifactId>
            <version>1.11.0</version>
        </dependency>
    </dependencies>
  1. 定义/resources/META-INF/MANIFEST.MF
Manifest-Version: 1.0
Can-Redefine-Classes: true
Can-Retransform-Classes: true
Premain-Class: com.seeger.demo.agent.Agent
  1. 定义premain

public class Agent {
    // 需要增强的类,这个是开源elastic-job的核心类,用户可自定义其他类
    private static final String ENHANCE_CLASS = "com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor";

    public static void premain(String agentArgs, Instrumentation instrumentation) {

        new AgentBuilder.Default()
                .type(ElementMatchers.named(ENHANCE_CLASS))
                .transform((builder, type, classLoader, module) ->
                        builder.method(ElementMatchers.named("execute").and(ModifierReviewable.OfByteCodeElement::isPublic))
                                .intercept(MethodDelegation.to(new DelegateTemplate(new DemoServiceInterceptor())))
                ).installOn(instrumentation);

        System.out.println("一点不萌");
    }

}
public class DelegateTemplate {
    private InstMethodAroundInterceptor interceptor;

    public DelegateTemplate(InstMethodAroundInterceptor interceptor) {
        this.interceptor = interceptor;
    }

    /**
     * 拦截增强主方法
     *
     * @param inst:                              被拦截对象本身
     * @param allArguments:被代理方法原参数
     * @param zuper:被代理方法的包装对象,zuper.call()调用原方法
     * @param method:原方法对象
     * @return
     */
    public Object interceptor(@This Object inst, @AllArguments Object[] allArguments,
                              @SuperCall Callable<?> zuper, @Origin Method method) {
        ResultWrapper rw = new ResultWrapper();
        if (this.interceptor != null) {
            try {
                // 调用前拦截处理
                this.interceptor.beforeMethod(inst, method,
                        allArguments, method.getParameterTypes(), rw);
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
        

        Object result = null;
        try {
            // 被代理方法调用
            result = zuper.call();
            if (this.interceptor != null) {
                try {
                    // 调用后拦截处理
                    result = this.interceptor.afterMethod(inst, method,
                            allArguments, method.getParameterTypes(), result);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            if (this.interceptor != null) {
                try {
                    // 调用异常拦截处理
                    this.interceptor.handleMethodException(inst, method,
                            allArguments, method.getParameterTypes(), e);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        }

        return result;
    }

}

public class DemoServiceInterceptor implements InstMethodAroundInterceptor {
    @Override
    public void beforeMethod(Object inst, Method interceptPoint, Object[] allArguments,
                             Class<?>[] argumentsTypes, ResultWrapper result) {
        System.out.println("DemoService Interceptor in haha ...");
    }

    @Override
    public Object afterMethod(Object inst, Method interceptPoint, Object[] allArguments,
                              Class<?>[] argumentsTypes, Object ret) {
        System.out.println("DemoService Interceptor out haha ...");
        return ret;
    }

    @Override
    public void handleMethodException(Object inst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
        System.out.println("DemoService Interceptor error handle ...");
    }
}

public interface InstMethodAroundInterceptor {
    /**
     * 拦截点前
     *
     * @param inst:                    被增强类实例
     * @param interceptPoint:被增强方法
     * @param allArguments:被增强方法入参
     * @param argumentsTypes:被增强方法入参类型
     * @param result:result            包装类
     */
    void beforeMethod(Object inst, Method interceptPoint,
                      Object[] allArguments, Class<?>[] argumentsTypes,
                      ResultWrapper result);

    Object afterMethod(Object inst, Method interceptPoint,
                       Object[] allArguments, Class<?>[] argumentsTypes,
                       Object ret);

    void handleMethodException(Object inst, Method method, Object[] allArguments,
                               Class<?>[] argumentsTypes, Throwable t);
}

public class ResultWrapper {
    private boolean isContinue;
    private Object result;

    public boolean isContinue() {
        return isContinue;
    }

    public void setContinue(boolean aContinue) {
        isContinue = aContinue;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}
  1. 可在有elastic-job的demo项目加入启动项
-javaagent:***(增强功能的jar包).jar -jar ***(待增强的jar包).jar
  1. 运行到elastic-job的job时就会无入侵式增强

APM

Span
  1. 描述信息
  2. 时间戳
  3. Annotation的tag信息
  4. parent_id(可追溯用)
Trace
Annotation

Agent启动流程

public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
    // 步骤1、初始化配置信息
    SnifferConfigInitializer.initialize(agentArgs); 
    // 步骤2~4、查找并解析skywalking-plugin.def插件文件;
    // AgentClassLoader加载插件类并进行实例化;PluginFinder提供插件匹配的功能
    final PluginFinder pluginFinder = new PluginFinder(
       new PluginBootstrap().loadPlugins());
    // 步骤5、使用 Byte Buddy 库创建 AgentBuilder
    final ByteBuddy byteBuddy = new ByteBuddy()
       .with(TypeValidation.of(Config.Agent.IS_OPEN_DEBUGGING_CLASS));
    new AgentBuilder.Default(byteBuddy)...installOn(instrumentation);
    // 这里省略创建 AgentBuilder的具体代码,后面展开详细说
    // 步骤6、使用 JDK SPI加载的方式并启动 BootService 服务。
    ServiceManager.INSTANCE.boot();
    // 步骤7、添加一个JVM钩子
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      public void run() { ServiceManager.INSTANCE.shutdown(); }
    }, "skywalking service shutdown thread"));
}
  1. 步骤1初始化配置信息: 将配置信息解析到Config类中,Config类中有很多静态类


    初始化配置信息.png
  2. 步骤2~4、查找并解析skywalking-plugin.def插件文件,解析自定的apm-sniffer底下apm-sdk-plugin插件,以收集各个中间件,rpc, mq的监控信息,这里使用AgentClassLoader自定义的类加载器,方便将不在应用的 Classpath 中引入 SkyWalking 的插件 jar 包:


    插件.png
  3. 步骤5、使用 Byte Buddy 库创建 AgentBuilder
  4. 步骤6、使用 JDK SPI加载的方式并启动 BootService 服务,BootService的SPI实现有Jvm, Grpc,Kafka,还有将数据发送到server的BootService
public void boot() {
        bootedServices = loadAllServices();
        // 准备
        prepare();
       // 开始
        startup();
       // 完成
        onComplete();
    }
BootService插件.png
  1. 步骤7、添加一个JVM钩子

Agent发送数据

agent客户端与服务端连接

public void boot() {
        grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
        connectCheckFuture = Executors.newSingleThreadScheduledExecutor(
            new DefaultNamedThreadFactory("GRPCChannelManager")
        ).scheduleAtFixedRate(
            new RunnableWithExceptionProtection(
                this,
                t -> LOGGER.error("unexpected exception.", t)
            ), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS
        );
    }
public void run() {
    if (reconnect && grpcServers.size() > 0) {
        // 根据配置,连接指定OAP实例的IP和端口
        managedChannel = GRPCChannel.newBuilder(ipAndPort[0], 
                Integer.parseInt(ipAndPort[1]))
            .addManagedChannelBuilder(new StandardChannelBuilder())
            .addManagedChannelBuilder(new TLSChannelBuilder())
            .addChannelDecorator(new AuthenticationDecorator())
            .build();
        // notify()方法会循环调用所有注册在当前连接上的GRPCChannelListener实
        // 例(记录在listeners集合中)的statusChanged()方法,通知它们连接创建
        // 成功的事件
        notify(GRPCChannelStatus.CONNECTED);
        // 设置 reconnect字段为false,暂时不会再重建连接了
        reconnect = false;
    }
}

默认grpc异步发送

使用Kafka发送

Agent自定义发送方式,比如RocketMQ

DataCarrier


学习总结

Agent初始化

发送trace的grpc

trace.png

参考文章

上一篇下一篇

猜你喜欢

热点阅读