17、skywalking的OAP-通过SegmentTrace

2021-06-21  本文已影响0人  rock_fish
SegmentTrace

包含了从Kafka初始化,接收数据、解析构建、存储;核心的源码流程如下:
KafkaFetcher -> TraceSegmentHandler#handle ->SegmentParserServiceImpl#send -> TraceAnalyzer#doAnalysis -> AnalysisListener#parsexxx -> AnalysisListener#build ->SourceReceiver#receive -> dispatcherManager#forward-> XXXDispatcher#dispatch

Kafka pull数据处理

TraceSegmentHandler#handle 中调用SegmentParserServiceImpl#send(segment)

public void send(SegmentObject segment) {
        final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
        traceAnalyzer.doAnalysis(segment);
    }

doAnalysis解析segment

public void doAnalysis(SegmentObject segmentObject) {
        if (segmentObject.getSpansList().size() == 0) {
            return;
        }

        createSpanListeners();//创建监听器

        notifySegmentListener(segmentObject);//处理trace

        segmentObject.getSpansList().forEach(spanObject -> {
            if (spanObject.getSpanId() == 0) {
                notifyFirstListener(spanObject, segmentObject);//根据第一个span的信息做一些处理
            }

            if (SpanType.Exit.equals(spanObject.getSpanType())) {
                notifyExitListener(spanObject, segmentObject);
            } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                notifyEntryListener(spanObject, segmentObject);//这里有很重要的链路的metric信息构建
            } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                notifyLocalListener(spanObject, segmentObject);
            } else {
                log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                          .name());
            }
        });

        notifyListenerToBuild();
    }

notifyEntryListener中调用MultiScopesAnalysisListener#parseEntry,把上下游的链路信息完善到sourceBuilder里,并添加到entrySourceBuilders中,在build环节,进一步构建成各维度的souce数据,包括链路的trace,以及调用统计如调用次数,pxx,响应时长等metric信息都在这个环节创建。

...
sourceReceiver.receive(entrySourceBuilder.toAll());
sourceReceiver.receive(entrySourceBuilder.toService());
sourceReceiver.receive(entrySourceBuilder.toServiceInstance());
sourceReceiver.receive(entrySourceBuilder.toEndpoint());
sourceReceiver.receive(entrySourceBuilder.toServiceRelation());
sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation());
EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation();
sourceReceiver.receive(endpointRelation);
...

从上边MultiScopesAnalysisListener#build代码片段中可以看到包含了ServiceServiceInstanceEndpointServiceRelationServiceInstanceRelationEndpointRelation这些类型的Source;并将这些Source提交给sourceReceiver,其底层封装的DispatcherManager会根据 Source的类型选择相应的SourceDispatcher,通过方法dispatch进一步处理。

具体的SourceDispatcher类是哪一个呢?

public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {

    @Override
    public void dispatch(EndpointRelation source) {
        switch (source.getDetectPoint()) {
            case SERVER:
                serverSide(source);
                break;
        }
    }

从这个类EndpointCallRelationDispatcher#dispatch的参数可以看出,这个类负责EndpointRelation这种类型的Source。查看dispatch的实现有以下这些:

BrowserAppTrafficSourceDispatcher.dispatch(SOURCE)
BrowserErrorLogRecordDispatcher.dispatch(BrowserErrorLog)
DatabaseStatementDispatcher.dispatch(DatabaseSlowStatement)
EndpointCallRelationDispatcher.dispatch(EndpointRelation)
EndpointMetaDispatcher.dispatch(EndpointMeta)
EndpointTrafficDispatcher.dispatch(Endpoint)
InstanceTrafficDispatcher.dispatch(ServiceInstance)
InstanceUpdateDispatcher.dispatch(ServiceInstanceUpdate)
LogRecordDispatcher.dispatch(Log)
NetworkAddressAliasSetupDispatcher.dispatch(NetworkAddressAliasSetup)
SegmentDispatcher.dispatch(Segment)
ServiceCallRelationDispatcher.dispatch(ServiceRelation)
ServiceInstanceCallRelationDispatcher.dispatch(ServiceInstanceRelation)
ServiceMetaDispatcher.dispatch(ServiceMeta)
ServiceTrafficDispatcher.dispatch(Service)
ZipkinSpanRecordDispatcher.dispatch(ZipkinSpan)

从以上清单中不难发现还缺少好多Source以及对应的Dispatcher类型;这些缺失的类,在Skywalking中是通过OAL机制在OAP启动时动态生成,OAL脚本位于/config文件夹中,用户只需更改并重新启动服务器即可使其生效。但是,OAL脚本还是编译语言,OAL运行时会动态生成Java代码。

可以在系统环境中添加SW_OAL_ENGINE_DEBUG=Y打开开关,以查看生成了哪些类,在oal-rt目录下的dispatcher 和 metrics两个目录查看


image.png image.png
这些生成的Metric的主要SCOPEAllServiceServiceInstanceEndpointServiceRelationServiceInstanceRelationEndpointRelation。此外,还有一些辅助SCOPE。查看官网的SCOPE定义,可以找到所有现有的SCOPE和字段

Source类的scope方法指定了SourceDispatcher的一个数字标识,

public abstract class Source {
    public abstract int scope();

最终这些Source会在SourceDispatcher的dispatch中,转换成StorageData,并交由MetricsStreamProcessor#in 进入L1、L2的聚合处理,报警处理,导出处理。

L1聚合

创建Worker,并构建worker链路:

  1. 启动扫描Stream注解的时候,在StreamAnnotationListener#notify中,通过MetricsStreamProcessor#create方法为每种Metrics生成一个MetricsAggregateWorker(当前实例内L1聚合),创建并注册一个这种Metric类型的远程Worker服务MetricsPersistentWorker(给其他实例的数据做L2聚合和报警、存储)
  2. 创建MetricsRemoteWorker并指定为MetricsAggregateWorker(L1聚合)的nextWorker,当完成L1聚合后将通过MetricsRemoteWorker当前的数据传递给远程的Worker服务MetricsPersistentWorker用于L2处理
  3. 数据在worker链路的流传的逻辑为:MetricsAggregateWorker(本实例做L1) -> MetricsRemoteWorke(本实例传递给远程MetricsPersistentWorker) -> MetricsPersistentWorker(远程实例,完成L2处理)->min级数据存储/更新->执行Hour聚合处理->执行day聚合处理->提交给AlarmWorker->提交给ExportWorker


    image.png
  1. MetricsAggregateWorker的一些实现细节: 接收到Metrics数据后,放入dataCarrier(10000*2)中,然后有一个线程去消费处理Metric,将metric丢入MergableBufferedData中执行初次的聚合,MergableBufferedData中是一个map,遇到id相同的则执行聚合
public void accept(final METRICS data) {
        final String id = data.id();
        final METRICS existed = buffer.get(id);
        if (existed == null) {
            buffer.put(id, data);
        } else {
            final boolean isAbandoned = !existed.combine(data);
            if (isAbandoned) {
                buffer.remove(id);
            }
        }
    }
MetricsPersistentWorker完成L2聚合

MetricsPersistentWorker内部使用了读写buffer缓冲,且buffer是可聚合的
即处理数据的时候,是:

  1. 丢入写buffer,这个写buffer在接收数据的时候具有聚合的作用
  2. 定时任务读buffer,这时候交换buffer的读写标识,把之前已写入数据的buffer变成读buffer,将数据读出来,进行下一步的处理。

MetricsRemoteWorker对应的远程服务是MetricsPersistentWorker,其内部有这三个很重要的worker,从其名字基本就可知道这些worker完成什么任务。

this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
this.transWorker = Optional.ofNullable(transWorker);

在通过in方法查看到数据写入buffer后,读buffer未完成后续操作的逻辑稍微绕一些,是在PersistenceTimer#start 中开启一个定时任务定时读数据进行处理,间隔是persistentPeriod(默认是3秒)

   public void buildBatchRequests(List<PrepareRequest> prepareRequests) {
        //取出一批
        final List<INPUT> dataList = getCache().read();
        //预处理
        prepareBatch(dataList, prepareRequests);
    }

prepareBatch中是最核心的逻辑:

  1. 在prepareBatch中遍历Metrics
  2. 每个metric记录都要交给transWorker做处理
  3. 当已处理的数据满2000条的时候写ES
  4. 当当前批次全部处理完的时候写ES
  5. 写ES的时候,如果记录已存在,则先聚合老数据再更新
  6. 写ES完成后,尝试将数据交给nextAlarmWorker和nextExportWorker。
上一篇下一篇

猜你喜欢

热点阅读