16、skywalking的OAP-通过SegmentTrace
SegmentTrace的核心处理流程
包含了从Kafka初始化,接收数据、解析构建、存储;核心的源码流程如下:
KafkaFetcher
-> TraceSegmentHandler#handle
->SegmentParserServiceImpl#send
-> TraceAnalyzer#doAnalysis
-> AnalysisListener#parsexxx
-> AnalysisListener#build
->SourceReceiver#receive
-> dispatcherManager#forward
-> SegmentDispatcher#dispatch
-
KafkaFetcherProvider
的start
方法中注册了TraceSegmentHandler,用于接收Trace的数据,进行处理。
handlerRegister.register(new TraceSegmentHandler(getManager(), config));
-
TraceSegmentHandler#handle
处理Kafka数据
//将Kafka数据,解析成SegmentObject
SegmentObject segment = SegmentObject.parseFrom(record.value().get());
//交给SegmentParserServiceImpl处理
segmentParserService.send(segment);
-
SegmentParserServiceImpl#send
中通过TraceAnalyzer
解析SegmentObject
public void send(SegmentObject segment) {
final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
traceAnalyzer.doAnalysis(segment);
}
其内部功能很多,从是Segment这个数据流程来说就是:构建多个监听器,以监听器的模式来通过解析segmentObject各个属性,通过构建SourceBuilder
对象来承载上下游的链路相关信息,并添加到entrySourceBuilders中;在build
环节,进一步构建成各维度的souce数据,包括Trace(链路),Metrics(调用统计如调用次数,pxx,响应时长等) 信息都在这个环节创建。先大致看下其代码主体流程,接下来会分析内部更多的细节逻辑:
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}
createSpanListeners();//创建监听器
notifySegmentListener(segmentObject);//处理trace
segmentObject.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, segmentObject);//根据第一个span的信息做一些处理
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, segmentObject);//这里有很重要的链路的metric信息构建
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});
notifyListenerToBuild();
}
-
SegmentAnalysisListener#parseSegment
构建Segment
(Source),部分属性赋值
1.1 赋值 起止时间
1.2 赋值 是否error
1.3 赋值 是否采样,这里是重点 -
SegmentAnalysisListener#notifyFirstListener
更多的属性赋值 -
多个
EntryAnalysisListener
监听器处理Entry类型的span
3.1SegmentAnalysisListener#parseEntry
赋值service和endpoint的Name和id3.2
NetworkAddressAliasMappingListener#parseEntry
构造NetworkAddressAliasSetup
完善ip_port地址与别名之间的映射关心,交给NetworkAddressAliasSetupDispatcher
处理3.3
MultiScopesAnalysisListener#parseEntry
遍历span列表- 3.3.1 将每个span构建成
SourceBuilder
,设置上下游的游的Server、Instance、endpoint的name信息,这里mq和网关特殊处理,其上游保持ip端口,因为mq、网关通常没有搭载agent,没有相关的name信息。 - 3.3.2 setPublicAttrs:SourceBuilder中添加 tag信息,重点是时间bucket,setResponseCode,Status,type(http,rpc,db)
- 3.3.3 SourceBuilder添加到
entrySourceBuilders
, - 3.3.4 parseLogicEndpoints//处理span的tag是
LOGIC_ENDPOINT = "x-le"
类型的,添加到 logicEndpointBuilders中(用途待梳理)
- 3.3.1 将每个span构建成
-
MultiScopesAnalysisListener#parseExit
监听器处理Exit类型的span
4.1 将span构建成SourceBuilder
,设置上下游的游的Server
、Instance
、Endpoint
的name信息,尝试把下游的ip_port信息修改成别名。
4.2 setPublicAttrs:SourceBuilder中添加 tag信息,重点是时间bucket,setResponseCode,Status,type(http,rpc,db)
4.3SourceBuilder
添加到exitSourceBuilders
,
4.4 如果是db类型,构造slowStatementBuilder,判断时长设置慢查询标识,存入dbSlowStatementBuilders中。这里是全局的阈值 是个改造点。 -
MultiScopesAnalysisListener#parseLocal
监听器处理Local
类型的span,通过parseLogicEndpoints
方法处理span的tag是LOGIC_ENDPOINT = "x-le"
类型的,添加到 logicEndpointBuilders中(用途待梳理) -
这里是重点,以上构建的
SourceBuilder
就在这一步使用,执行各个AnalysisListener#build
6.1
SegmentAnalysisListener#build
,设置endpoint的 id 和name,然后将Segment
交给SourceReceiver#receive
处理,而SourceReceiver#receive
就是调用dispatcherManager#forward
,最终交给SegmentDispatcher#dispatch
处理了,
6.2MultiScopesAnalysisListener#build
中根据以上流程中创建的数据,会再构造出多种Metric 类型的Source数据交给SourceReceiver处理;这些逻辑在这篇笔记中不展开,本篇已Segment流程为主
SegmentDispatcher 处理 Segment
简单来说,这里的逻辑就是把Source转换成StorageData,交给RecordStreamProcessor
里的一组AbstractWorker
,用于完成以记录存储为主的相关工作。
public class SegmentDispatcher implements SourceDispatcher<Segment> {
@Override
public void dispatch(Segment source) {
//Segment(Source) 转换成 SegmentRecord(StorageData)
SegmentRecord segment = new SegmentRecord();
segment.setSegmentId(source.getSegmentId());
...
segment.setTags(Tag.Util.toStringList(source.getTags()));
//交给worker链路处理StorageData
RecordStreamProcessor.getInstance().in(segment);
}
}
具体看下RecordStreamProcessor#in
中RecordPersistentWorker
是如何把记录存储到ES中的。RecordPersistentWorker#in
中的逻辑很清晰:
//1. 把SegmentRecord,通过RecordEsDAO的方法转换成ES的 InsertRequest
InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
//2. 异步的方式写es
batchDAO.asynchronous(insertRequest);
-
BatchProcessEsDAO#prepareBatchInsert
方法中,通过storageBuilder
把SegmentRecord
转换成map,再讲map构建成XContentBuilder
,进而构建成ES里的InsertRequest
实例
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
XContentBuilder builder = map2builder(
IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record)));
String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, record.id());
return getClient().prepareInsert(modelName, id, builder);
}
-
BatchProcessEsDAO#asynchronous
首先初始化bulkProcessor
,将InsertRequest
提交给bulkProcessor
,bulkProcessor
是一个异步批插入的操作,细节可另行百度。
至此Trace写到ES后,这个流程就算结束了。