14、Skywalking的OAP-核心流程串烧

2021-06-09  本文已影响0人  rock_fish
  1. Kafka/Grpc的 Handler接收数据,解析接收到的数据构建成Source,不同的数据类型,这个过程复杂程度不同。
  2. DispatcherManager找到Source对应的SourceDispatcher,将Source传入dispatch方法中做处理
    2.1 将Source转换成对应的StorageData的子类,比如Segment(Source) 转换成 SegmentRecord(StorageData)
    2.2 调用具体的StreamProcessor处理StorageData

流程图:https://www.processon.com/view/link/60c1e5f20791297a3f0e4411

image.png image.png
Kafka接收数据

大致的流程为:Handler->解析构建Source-> SourceDispatcher#dispatch

有哪些SourceDisptch呢,他们都处理什么类型的数据呢,这里是通过扫描的方式来实现的,其大致过程如下:

CoreModuleProvider#start会调用SourceReceiverImpl.scan() ,再DispatcherManager#scan扫描接口SourceDispatcher的实现类,并根据实现类的dispatch方法的参数确定其处理的Source的类型;比如SourceDispatcher#dispatch处理的Source的类型是Segment

public class SegmentDispatcher implements SourceDispatcher<Segment> {

    @Override
    public void dispatch(Segment source) {
    ...
    }

这些被扫描到的实例,会被存储到DispatcherManager的成员变量dispatcherMap中,其key是Source#scope(),value是具体的SourceDispatcher实现类的实例。在使用的时候通过Source#scope()dispatcherMap中检索到Source对应的xxxDispatcher实例,进而调用其dispatch方法

需要知晓SourceDispatcher 的部分实现是通过 OAL 脚本生成的,我们在源码中看不到

BrowserAppTrafficSourceDispatcher.dispatch(SOURCE)
BrowserErrorLogRecordDispatcher.dispatch(BrowserErrorLog)
DatabaseStatementDispatcher.dispatch(DatabaseSlowStatement)
EndpointCallRelationDispatcher.dispatch(EndpointRelation)
EndpointMetaDispatcher.dispatch(EndpointMeta)
EndpointTrafficDispatcher.dispatch(Endpoint)
InstanceTrafficDispatcher.dispatch(ServiceInstance)
InstanceUpdateDispatcher.dispatch(ServiceInstanceUpdate)
LogRecordDispatcher.dispatch(Log)
NetworkAddressAliasSetupDispatcher.dispatch(NetworkAddressAliasSetup)
SegmentDispatcher.dispatch(Segment)
ServiceCallRelationDispatcher.dispatch(ServiceRelation)
ServiceInstanceCallRelationDispatcher.dispatch(ServiceInstanceRelation)
ServiceMetaDispatcher.dispatch(ServiceMeta)
ServiceTrafficDispatcher.dispatch(Service)
ZipkinSpanRecordDispatcher.dispatch(ZipkinSpan)
存储

启动时扫描@Stream标注,这个注解标注在StorageData上,看其核心成员:

例如:

@Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
public class LogRecord extends AbstractLogRecord {

扫描@Stream,对其解析后能会构造StreamProcessor,调用其create方法 做一些数据存储层的初始化工作:

这些内容的工作链路大致如下:
0、OAP启动时,如果需要创建索引和模板,就通过Model执行创建。

1、Kafka或Grpc接收到收据后,会解析原始数据,构造成 各种StorageData交给 RecordStreamProcessor#in

2、遍历workder链,寻找对应StorageData的worker ,调用其in方法继续处理

3、通常在in方法中调用dao执行存储

4、dao中调用StorageBuilder的转换方法,将StorageData转换成map,进而拼装成es 的IndexRequest对象。

上一篇 下一篇

猜你喜欢

热点阅读