Skywalking源码研究之agent插件与链路监控
插件
紧接skywalking-agent初始化, skywalking使用微内核架构,对每一种框架的支持都是通过插件形式实现的
使用bytebuddy可以非常友好的进行切面编程,但skywalking毕竟是带有特定主题的切面:APM
于是skywalking把APM相关的API(例如与OAP通信)进行进一步封装,并抽象出监控的字节码增强插件定义:AbstractClassEnhancePluginDefine
,它的实现即为各个插件的定义类
这些插件实现类可以基于skywalking-agent-core中提供的API轻松完成上报(一般指创建span
)
分布式链路
结合整体示意图
img_2.png
span
skywalking-agent字节码增强大部分都是上报一个span给OAP,一个span就是分布式链路中的一个节点,包含主要属性:
-
spanid
ID -
endpointName
名称,一般是url路径或方法名称 -
serviceCode
节点运行的服务名称 -
component
描述监控的框架,如SpringMVC/Fegin/Dubbo等 -
isError
该节点是否异常 -
startTime&endTime
开始时间和结束时间,可计算出节点运行的时长 -
peer
ip+端口 -
type
span类型,下面细说 -
traceId
事务ID -
segmentId
片段ID -
parentSpanId
父ID
trace
每个span有一个traceId
属性标识所属事务,多个相同traceId
的span共同组成一个事务(trace
),它们通过parentSpanId
形成了一个链路,链路不绝对是链条的结构,也有可能是树形结构(一个父节点可能有多个子节点)
segment
在分布式事务中,一个trace中的span隶属不同的线程,为了区分,引入了segment做为区分
segment是一个trace中隶属相同线程的span集合,因此也可以说多个相同线程的span组成segment,多个segment组成trace
同时,segment也是探针进行数据上报的基本单位
span类型
span的type属性表示span的类型,包含三种
-
Entry
代表某个segment的入口span,就是第一个span,比如使用@RequestMapping定义的接口、dubbo的服务提供者 -
Local
代表普通的 Java 方法, 它与远程服务无关,所有本地方法调用都是local类型,包括异步线程调用 -
Exit
代表某个segment的出口span,例如访问数据库、使用Fegin调用其他服务、Dubbo的服务调用者
ui
skywalking-ui直观的展示了整个调用链路,如下
img_3.png
上下文
当发生A->B
调用时,已知通过相关技术插件可实现:
- A发起调用时上报
- B被调用时上报
但问题是OAP如何得知两个span隶属一个trace,或者如何得知两个span是否属于一个segment?
实际上,A、B在上报span时已提交相同的traceId,OAP在分析数据时才能展示出调用链路关系,所以问题的关键是A,B两个span如何共享上下文信息,涉及到主要三种情况
- 单线程调用 即普通的A方法调用B方法
- 跨线程调用 A方法中异步调用B方法
- 跨进程调用 即分布式调用,A、B方法属于两个进程
单线程调用
普通方法调用比较简单,skywalking-agent-core
中提供的ContextManager
使用ThreadLocal
即可在上报span时注入上下文信息,实现A、B方法的上下文信息共享
public class ContextManager implements BootService {
// 使用ThreadLocal实现线程内的上下文
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
}
这一部分由于skywalking-agent-core
已经封装好,所以插件不需再做额外处理
跨线程调用
当出现跨线程异步调用时,ThreadLocal就失效了,此时上下文信息就需要在线程之间传输
ContextManager
提供了两个方法来支持跨线程的上下文传递
-
capture
生成上下文信息的快照ContextSnapshot
,信息一般来源为当前线程的ThreadLocal -
continued
以ContextSnapshot
为参数重现上下文信息(存入当前线程的ThreadLocal)
而ContextSnapshot
的具体传递就需要插件自己来实现,步骤如下
- 父线程调用
ContextManager#capture
方法生成上下文快照 - 父线程调用子线程,并通过修改参数等方式传递快照至子线程
- 子线程使用
ContextManager#continued
方法,传入快照信息,重现父线程的上下文
跨进程实现原理
当发生A->B
分布式调用时,由于跨进程,ThreadLocal肯定行不通,A与B之间的上下文传递必然是序列化后通过网络传输的
core中提供了可序列化的网络传输载体对象:ContextCarrier
,同时ContextManager
提供了两个方法来支持ContextCarrier的注入和解压
-
inject
将当前上下文注入到ContextCarrier对象 -
extract
将ContextCarrier对象解压到当前上下文
而ContextCarrier
的传递方式是不同插件根据实际组件自己实现的,比如:
- Fegin调用(Http)时是通过把ContextCarrier放到请求头实现的
- Dubbo调用时是通过Dubbo框架提供的附件传递的
- Kafka是通过消息的形式传递的(todo 这里待细看)
以Fegin调用为例,Fegin发起调用的客户端拦截器是:DefaultHttpClientInterceptor
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) {
// 创建载体
ContextCarrier contextCarrier = new ContextCarrier();
// 创建出口span,内部执行了inject方法注入载体
AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remotePeer);
...
// 获取上下文信息的每一项
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
List<String> contextCollection = new ArrayList<String>(1);
contextCollection.add(next.getHeadValue());
// 加入请求的header中
headers.put(next.getHeadKey(), contextCollection);
}
...
}
对应的服务端一般是spring的@RequestMapping接口,对应的拦截器是RequestMappingMethodInterceptor
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
...
// 创建载体
final ContextCarrier contextCarrier = new ContextCarrier();
// http请求
final HttpServletRequest httpServletRequest = (HttpServletRequest) request;
CarrierItem next = contextCarrier.items();
// 循环上下文信息的每一项
while (next.hasNext()) {
next = next.next();
// 从header中获取对应项,装载到载体上
next.setHeadValue(httpServletRequest.getHeader(next.getHeadKey()));
}
// 内部调用extract,将载体解压到上下文
AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);
}
插件定义
AbstractClassEnhancePluginDefine
skywalking-agent的插件首先要有一个类增强插件定义,skywalking-agent抽象出插件定义的规范:AbstractClassEnhancePluginDefine
,各插件要给出具体实现,同时skywalking-agent-core情况有进一步抽象了两种实现
- ClassInstanceMethodsEnhancePluginDefine 针对类实例拦截定义
- ClassStaticMethodsEnhancePluginDefine 针对静态方法拦截定义
ClassInstanceMethodsEnhancePluginDefine
ClassInstanceMethodsEnhancePluginDefine
,是针对对类实例的一种增强插件定义的抽象,插件通过继承它可以实现对实例的拦截,只需实现如下方法:
/**
* 需要被拦截Class
* @return
*/
@Override
protected ClassMatch enhanceClass() {
return null;
}
/**
* 构造器切点,可以是多个
* @return
*/
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
/**
* 方法切点,可以是多个
* @return InstanceMethodsInterceptPoint
*/
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
ClassMatch
用来匹配类,agent-core提供如下常用API来实现类匹配
-
NameMatch.byName
根据名称匹配 -
ClassAnnotationMatch.byClassAnnotationMatch
根据类注解匹配 -
MethodAnnotationMatchbyMethodAnnotationMatch
根据类中方法注解匹配 -
HierarchyMatch.byHierarchyMatch
根据父类或实现接口匹配
ConstructorInterceptPoint
和InstanceMethodsInterceptPoint
下面介绍
ClassStaticMethodsEnhancePluginDefine
针对静态方法拦截定义,继承者需实现
/**
* 构造器切点,可以是多个
* @return
*/
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
/**
* 方法切点,可以是多个
* @return InstanceMethodsInterceptPoint
*/
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
ConstructorInterceptPoint
和InstanceMethodsInterceptPoint
下面介绍
InstanceMethodsInterceptPoint
无论是实例还是静态方法,都需要InstanceMethodsInterceptPoint
数组来进行方法切点和拦截器,主要包含如下属性
public interface InstanceMethodsInterceptPoint {
/**
* 方法的匹配
*/
ElementMatcher<MethodDescription> getMethodsMatcher();
/**.
* 返回一个拦截器全类名,所有拦截器必须实现InstanceMethodsAroundInterceptor 接口
*/
String getMethodsInterceptor();
/**
* 是否要覆盖原方法入参
*/
boolean isOverrideArgs();
}
其中指定的拦截器都需要实现InstanceMethodsAroundInterceptor
接口
InstanceMethodsAroundInterceptor
具体的拦截代码,主要实现如下方法
public interface InstanceMethodsAroundInterceptor {
/**
* 前置处理
*/
void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable;
/**
* 后置处理
*/
Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable;
/**
* 异常处理
*/
void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t);
}
ConstructorInterceptPoint
与InstanceMethodsInterceptPoint基本差不多,只不过针对的是构造方法
总结
插件的开发基本就是对skywalking-agent-core定义的一些抽象的具体实现,最总打成jar包,放入plugins
目录,插件即可生效
注:插件的resources目录中一定要添加skywalking-plugin.def
文件,内容是
{name}={增强插件定义全路径名}
可以是多个,以springmvc举例如下
spring-mvc-annotation-5.x=org.apache.skywalking.apm.plugin.spring.mvc.v5.define.ControllerInstrumentation
spring-mvc-annotation-5.x=org.apache.skywalking.apm.plugin.spring.mvc.v5.define.RestControllerInstrumentation
同时skywalking-agent-core
提供丰富的api用于插件拦截后的上报,详见skywalking上报和采集