16、skywalking的OAP-通过SegmentTrace

2021-06-10  本文已影响0人  rock_fish
SegmentTrace的核心处理流程

包含了从Kafka初始化,接收数据、解析构建、存储;核心的源码流程如下:
KafkaFetcher -> TraceSegmentHandler#handle ->SegmentParserServiceImpl#send -> TraceAnalyzer#doAnalysis -> AnalysisListener#parsexxx -> AnalysisListener#build ->SourceReceiver#receive -> dispatcherManager#forward-> SegmentDispatcher#dispatch

  1. KafkaFetcherProviderstart方法中注册了TraceSegmentHandler,用于接收Trace的数据,进行处理。
handlerRegister.register(new TraceSegmentHandler(getManager(), config));
  1. TraceSegmentHandler#handle处理Kafka数据
//将Kafka数据,解析成SegmentObject
SegmentObject segment = SegmentObject.parseFrom(record.value().get());
//交给SegmentParserServiceImpl处理
segmentParserService.send(segment);
  1. SegmentParserServiceImpl#send中通过 TraceAnalyzer 解析SegmentObject
    public void send(SegmentObject segment) {
        final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
        traceAnalyzer.doAnalysis(segment);
    }

其内部功能很多,从是Segment这个数据流程来说就是:构建多个监听器,以监听器的模式来通过解析segmentObject各个属性,通过构建SourceBuilder对象来承载上下游的链路相关信息,并添加到entrySourceBuilders中;在build环节,进一步构建成各维度的souce数据,包括Trace(链路),Metrics(调用统计如调用次数,pxx,响应时长等) 信息都在这个环节创建。先大致看下其代码主体流程,接下来会分析内部更多的细节逻辑:

public void doAnalysis(SegmentObject segmentObject) {
        if (segmentObject.getSpansList().size() == 0) {
            return;
        }

        createSpanListeners();//创建监听器

        notifySegmentListener(segmentObject);//处理trace

        segmentObject.getSpansList().forEach(spanObject -> {
            if (spanObject.getSpanId() == 0) {
                notifyFirstListener(spanObject, segmentObject);//根据第一个span的信息做一些处理
            }

            if (SpanType.Exit.equals(spanObject.getSpanType())) {
                notifyExitListener(spanObject, segmentObject);
            } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                notifyEntryListener(spanObject, segmentObject);//这里有很重要的链路的metric信息构建
            } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                notifyLocalListener(spanObject, segmentObject);
            } else {
                log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                          .name());
            }
        });

        notifyListenerToBuild();
    }
  1. SegmentAnalysisListener#parseSegment构建Segment(Source),部分属性赋值
    1.1 赋值 起止时间
    1.2 赋值 是否error
    1.3 赋值 是否采样,这里是重点

  2. SegmentAnalysisListener#notifyFirstListener 更多的属性赋值

  3. 多个EntryAnalysisListener监听器处理Entry类型的span
    3.1 SegmentAnalysisListener#parseEntry赋值service和endpoint的Name和id

    3.2 NetworkAddressAliasMappingListener#parseEntry 构造NetworkAddressAliasSetup完善ip_port地址与别名之间的映射关心,交给NetworkAddressAliasSetupDispatcher处理

    3.3 MultiScopesAnalysisListener#parseEntry 遍历span列表

    • 3.3.1 将每个span构建成SourceBuilder,设置上下游的游的Server、Instance、endpoint的name信息,这里mq和网关特殊处理,其上游保持ip端口,因为mq、网关通常没有搭载agent,没有相关的name信息。
    • 3.3.2 setPublicAttrs:SourceBuilder中添加 tag信息,重点是时间bucket,setResponseCode,Status,type(http,rpc,db)
    • 3.3.3 SourceBuilder添加到entrySourceBuilders,
    • 3.3.4 parseLogicEndpoints//处理span的tag是LOGIC_ENDPOINT = "x-le"类型的,添加到 logicEndpointBuilders中(用途待梳理)
  4. MultiScopesAnalysisListener#parseExit监听器处理Exit类型的span
    4.1 将span构建成SourceBuilder,设置上下游的游的ServerInstanceEndpoint的name信息,尝试把下游的ip_port信息修改成别名。
    4.2 setPublicAttrs:SourceBuilder中添加 tag信息,重点是时间bucket,setResponseCode,Status,type(http,rpc,db)
    4.3 SourceBuilder添加到exitSourceBuilders
    4.4 如果是db类型,构造slowStatementBuilder,判断时长设置慢查询标识,存入dbSlowStatementBuilders中。这里是全局的阈值 是个改造点。

  5. MultiScopesAnalysisListener#parseLocal监听器处理Local类型的span,通过parseLogicEndpoints方法处理span的tag是LOGIC_ENDPOINT = "x-le"类型的,添加到 logicEndpointBuilders中(用途待梳理)

  6. 这里是重点,以上构建的SourceBuilder就在这一步使用,执行各个AnalysisListener#build

    6.1 SegmentAnalysisListener#build,设置endpoint的 id 和name,然后将Segment交给SourceReceiver#receive处理,而SourceReceiver#receive就是调用dispatcherManager#forward,最终交给SegmentDispatcher#dispatch处理了,
    6.2 MultiScopesAnalysisListener#build中根据以上流程中创建的数据,会再构造出多种Metric 类型的Source数据交给SourceReceiver处理;这些逻辑在这篇笔记中不展开,本篇已Segment流程为主

SegmentDispatcher 处理 Segment

简单来说,这里的逻辑就是把Source转换成StorageData,交给RecordStreamProcessor里的一组AbstractWorker,用于完成以记录存储为主的相关工作。

public class SegmentDispatcher implements SourceDispatcher<Segment> {

    @Override
    public void dispatch(Segment source) {
        //Segment(Source) 转换成 SegmentRecord(StorageData)
        SegmentRecord segment = new SegmentRecord();
        segment.setSegmentId(source.getSegmentId());
        ...
        segment.setTags(Tag.Util.toStringList(source.getTags()));
        //交给worker链路处理StorageData
        RecordStreamProcessor.getInstance().in(segment);
    }
}

具体看下RecordStreamProcessor#inRecordPersistentWorker是如何把记录存储到ES中的。RecordPersistentWorker#in中的逻辑很清晰:

//1. 把SegmentRecord,通过RecordEsDAO的方法转换成ES的 InsertRequest
InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
//2. 异步的方式写es
batchDAO.asynchronous(insertRequest);
  1. BatchProcessEsDAO#prepareBatchInsert方法中,通过storageBuilderSegmentRecord转换成map,再讲map构建成XContentBuilder,进而构建成ES里的InsertRequest实例
    @Override
    public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
        XContentBuilder builder = map2builder(
            IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record)));
        String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
        String id = IndexController.INSTANCE.generateDocId(model, record.id());
        return getClient().prepareInsert(modelName, id, builder);
    }
  1. BatchProcessEsDAO#asynchronous
    首先初始化bulkProcessor,将InsertRequest提交给bulkProcessor,bulkProcessor是一个异步批插入的操作,细节可另行百度。

至此Trace写到ES后,这个流程就算结束了。

上一篇下一篇

猜你喜欢

热点阅读